HBASE-7942. Makes the RegionServer aware of favored-nodes. The regionserver can now specify where to create the region files if the underlying HDFS supports the corresponding create(..favorednodes..) API.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1485584 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2013-05-23 06:01:27 +00:00
parent b5146ebf6e
commit 87a052f08d
14 changed files with 313 additions and 14 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -261,9 +262,9 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
/** A helper method to create HFile output streams in constructors */
protected static FSDataOutputStream createOutputStream(Configuration conf,
FileSystem fs, Path path) throws IOException {
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
return FSUtils.create(fs, path, perms);
return FSUtils.create(fs, path, perms, favoredNodes);
}
}

View File

@ -25,6 +25,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.SequenceInputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@ -338,6 +339,7 @@ public class HFile {
HFile.DEFAULT_COMPRESSION_ALGORITHM;
protected HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
protected KeyComparator comparator;
protected InetSocketAddress[] favoredNodes;
protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM;
protected boolean includeMVCCReadpoint = true;
@ -390,6 +392,12 @@ public class HFile {
return this;
}
public WriterFactory withFavoredNodes(InetSocketAddress[] favoredNodes) {
// Deliberately not checking for null here.
this.favoredNodes = favoredNodes;
return this;
}
public WriterFactory withChecksumType(ChecksumType checksumType) {
Preconditions.checkNotNull(checksumType);
this.checksumType = checksumType;
@ -416,7 +424,7 @@ public class HFile {
"filesystem/path or path");
}
if (path != null) {
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path);
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
}
return createWriter(fs, path, ostream, blockSize,
compression, encoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint);

View File

@ -115,7 +115,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
final KeyComparator comparator, final ChecksumType checksumType,
final int bytesPerChecksum, final boolean includeMVCCReadpoint) throws IOException {
super(cacheConf,
ostream == null ? createOutputStream(conf, fs, path) : ostream,
ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
path, blockSize, compressAlgo, blockEncoder, comparator);
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;

View File

@ -0,0 +1,47 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName;
/**
* Abstraction that allows different modules in RegionServer to update/get
* the favored nodes information for regions.
*/
@InterfaceAudience.Private
interface FavoredNodesForRegion {
/**
* Used to update the favored nodes mapping when required.
* @param encodedRegionName
* @param favoredNodes
*/
void updateRegionFavoredNodesMapping(String encodedRegionName, List<ServerName> favoredNodes);
/**
* Get the favored nodes mapping for this region. Used when the HDFS create API
* is invoked to pass in favored nodes hints for new region files.
* @param encodedRegionName
* @return array containing the favored nodes' InetSocketAddresses
*/
InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName);
}

View File

@ -701,7 +701,7 @@ public class HRegionFileSystem {
// First check to get the permissions
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// Write the RegionInfo file content
FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms);
FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null);
try {
out.write(content);
} finally {

View File

@ -3516,7 +3516,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
return builder.build();
}
private void updateRegionFavoredNodesMapping(String encodedRegionName,
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
// Refer to the comment on the declaration of regionFavoredNodesMap on why
@ -3534,6 +3535,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
* @param encodedRegionName
* @return array of favored locations
*/
@Override
public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
return regionFavoredNodesMap.get(encodedRegionName);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -790,6 +791,11 @@ public class HStore implements Store {
} else {
writerCacheConf = cacheConf;
}
InetSocketAddress[] favoredNodes = null;
if (region.getRegionServerServices() != null) {
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
region.getRegionInfo().getEncodedName());
}
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
this.getFileSystem(), blocksize)
.withFilePath(fs.createTempName())
@ -800,6 +806,7 @@ public class HStore implements Store {
.withChecksumType(checksumType)
.withBytesPerChecksum(bytesPerChecksum)
.withCompression(compression)
.withFavoredNodes(favoredNodes)
.includeMVCCReadpoint(includeMVCCReadpoint)
.build();
return w;

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@ -37,7 +36,7 @@ import org.apache.zookeeper.KeeperException;
* Services provided by {@link HRegionServer}
*/
@InterfaceAudience.Private
public interface RegionServerServices extends OnlineRegions {
public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegion {
/**
* @return True if this regionserver is stopping.
*/

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.DataInput;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
@ -529,6 +530,7 @@ public class StoreFile {
private long maxKeyCount = 0;
private Path dir;
private Path filePath;
private InetSocketAddress[] favoredNodes;
private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
private boolean includeMVCCReadpoint = true;
@ -571,6 +573,15 @@ public class StoreFile {
return this;
}
/**
* @param favoredNodes an array of favored nodes or possibly null
* @return this (for chained invocation)
*/
public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
this.favoredNodes = favoredNodes;
return this;
}
public WriterBuilder withDataBlockEncoder(HFileDataBlockEncoder encoder) {
Preconditions.checkNotNull(encoder);
this.dataBlockEncoder = encoder;
@ -659,7 +670,7 @@ public class StoreFile {
}
return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,
conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType,
bytesPerChecksum, includeMVCCReadpoint);
bytesPerChecksum, includeMVCCReadpoint, favoredNodes);
}
}
@ -764,6 +775,7 @@ public class StoreFile {
* @param checksumType the checksum type
* @param bytesPerChecksum the number of bytes per checksum value
* @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
* @param favoredNodes
* @throws IOException problem writing to FS
*/
private Writer(FileSystem fs, Path path, int blocksize,
@ -772,7 +784,8 @@ public class StoreFile {
CacheConfig cacheConf,
final KVComparator comparator, BloomType bloomType, long maxKeys,
final ChecksumType checksumType, final int bytesPerChecksum,
final boolean includeMVCCReadpoint) throws IOException {
final boolean includeMVCCReadpoint, InetSocketAddress[] favoredNodes)
throws IOException {
this.dataBlockEncoder = dataBlockEncoder != null ?
dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
writer = HFile.getWriterFactory(conf, cacheConf)
@ -783,6 +796,7 @@ public class StoreFile {
.withComparator(comparator.getRawComparator())
.withChecksumType(checksumType)
.withBytesPerChecksum(bytesPerChecksum)
.withFavoredNodes(favoredNodes)
.includeMVCCReadpoint(includeMVCCReadpoint)
.create();

View File

@ -24,7 +24,9 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.FileSystemVersionException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
@ -66,6 +69,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@ -262,11 +266,42 @@ public abstract class FSUtils {
*
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm permissions
* @param favoredNodes
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm) throws IOException {
FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
if (fs instanceof HFileSystem) {
FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
if (backingFs instanceof DistributedFileSystem) {
// Try to use the favoredNodes version via reflection to allow backwards-
// compatibility.
try {
return (FSDataOutputStream) (DistributedFileSystem.class
.getDeclaredMethod("create", Path.class, FsPermission.class,
boolean.class, int.class, short.class, long.class,
Progressable.class, InetSocketAddress[].class)
.invoke(backingFs, path, FsPermission.getDefault(), true,
getDefaultBufferSize(backingFs),
getDefaultReplication(backingFs, path),
getDefaultBlockSize(backingFs, path),
null, favoredNodes));
} catch (InvocationTargetException ite) {
// Function was properly called, but threw it's own exception.
throw new IOException(ite.getCause());
} catch (NoSuchMethodException e) {
LOG.debug("Ignoring (most likely Reflection related exception) " + e);
} catch (IllegalArgumentException e) {
LOG.debug("Ignoring (most likely Reflection related exception) " + e);
} catch (SecurityException e) {
LOG.debug("Ignoring (most likely Reflection related exception) " + e);
} catch (IllegalAccessException e) {
LOG.debug("Ignoring (most likely Reflection related exception) " + e);
}
}
}
return create(fs, path, perm, true);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -533,6 +534,16 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
}
@Override
public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
return null;
}
@Override
public MultiResponse replay(RpcController controller, MultiRequest request)
throws ServiceException {

View File

@ -0,0 +1,164 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.fail;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Progressable;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests the ability to specify favored nodes for a region.
*/
@Category(MediumTests.class)
public class TestRegionFavoredNodes {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static HTable table;
private static final byte[] TABLE_NAME = Bytes.toBytes("table");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("family");
private static final int FAVORED_NODES_NUM = 3;
private static final int REGION_SERVERS = 6;
private static final int FLUSHES = 3;
private static Method createWithFavoredNode = null;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
createWithFavoredNode = DistributedFileSystem.class.getDeclaredMethod("create", Path.class,
FsPermission.class, boolean.class, int.class, short.class, long.class,
Progressable.class, InetSocketAddress[].class);
} catch (NoSuchMethodException nm) {
return;
}
TEST_UTIL.startMiniCluster(REGION_SERVERS);
table = TEST_UTIL.createTable(TABLE_NAME, COLUMN_FAMILY);
TEST_UTIL.createMultiRegions(table, COLUMN_FAMILY);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (createWithFavoredNode == null) {
return;
}
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testFavoredNodes() throws Exception {
Assume.assumeTrue(createWithFavoredNode != null);
// Get the addresses of the datanodes in the cluster.
InetSocketAddress[] nodes = new InetSocketAddress[REGION_SERVERS];
List<DataNode> datanodes = TEST_UTIL.getDFSCluster().getDataNodes();
Method selfAddress;
try {
selfAddress = DataNode.class.getMethod("getSelfAddr");
} catch (NoSuchMethodException ne) {
selfAddress = DataNode.class.getMethod("getXferAddress");
}
for (int i = 0; i < REGION_SERVERS; i++) {
nodes[i] = (InetSocketAddress)selfAddress.invoke(datanodes.get(i));
}
String[] nodeNames = new String[REGION_SERVERS];
for (int i = 0; i < REGION_SERVERS; i++) {
nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" +
nodes[i].getPort();
}
// For each region, choose some datanodes as the favored nodes then assign
// them as favored nodes through the HRegion.
for (int i = 0; i < REGION_SERVERS; i++) {
HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
List<HRegion> regions = server.getOnlineRegions(TABLE_NAME);
for (HRegion region : regions) {
List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName>favoredNodes =
new ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName>(3);
String encodedRegionName = region.getRegionInfo().getEncodedName();
for (int j = 0; j < FAVORED_NODES_NUM; j++) {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder b =
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder();
b.setHostName(nodes[(i + j) % REGION_SERVERS].getAddress().getHostAddress());
b.setPort(nodes[(i + j) % REGION_SERVERS].getPort());
b.setStartCode(-1);
favoredNodes.add(b.build());
}
server.updateRegionFavoredNodesMapping(encodedRegionName, favoredNodes);
}
}
// Write some data to each region and flush. Repeat some number of times to
// get multiple files for each region.
for (int i = 0; i < FLUSHES; i++) {
TEST_UTIL.loadTable(table, COLUMN_FAMILY);
TEST_UTIL.flush();
}
// For each region, check the block locations of each file and ensure that
// they are consistent with the favored nodes for that region.
for (int i = 0; i < REGION_SERVERS; i++) {
HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
List<HRegion> regions = server.getOnlineRegions(TABLE_NAME);
for (HRegion region : regions) {
List<String> files = region.getStoreFileList(new byte[][]{COLUMN_FAMILY});
for (String file : files) {
FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem().
getFileStatus(new Path(new URI(file).getPath()));
BlockLocation[] lbks =
((DistributedFileSystem)TEST_UTIL.getDFSCluster().getFileSystem())
.getFileBlockLocations(status, 0, Long.MAX_VALUE);
for (BlockLocation lbk : lbks) {
locations:
for (String info : lbk.getNames()) {
for (int j = 0; j < FAVORED_NODES_NUM; j++) {
if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) {
continue locations;
}
}
// This block was at a location that was not a favored location.
fail("Block location " + info + " not a favored node");
}
}
}
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -192,6 +193,16 @@ public class MockRegionServerServices implements RegionServerServices {
return null;
}
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
}
@Override
public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
return null;
}
@Override
public Map<String, HRegion> getRecoveringRegions() {
// TODO Auto-generated method stub

View File

@ -247,7 +247,7 @@ public class TestFSUtils {
// then that the correct file is created
Path p = new Path("target" + File.separator + UUID.randomUUID().toString());
try {
FSDataOutputStream out = FSUtils.create(fs, p, filePerm);
FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null);
out.close();
FileStatus stat = fs.getFileStatus(p);
assertEquals(new FsPermission("700"), stat.getPermission());
@ -269,13 +269,13 @@ public class TestFSUtils {
Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
try {
FSDataOutputStream out = FSUtils.create(fs, p, perms);
FSDataOutputStream out = FSUtils.create(fs, p, perms, null);
out.close();
assertTrue("The created file should be present", FSUtils.isExists(fs, p));
// delete the file with recursion as false. Only the file will be deleted.
FSUtils.delete(fs, p, false);
// Create another file
FSDataOutputStream out1 = FSUtils.create(fs, p1, perms);
FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null);
out1.close();
// delete the file with recursion as false. Still the file only will be deleted
FSUtils.delete(fs, p1, true);