Revert "HBASE-27091 Speed up the loading of table descriptor from filesystem (#4493)"
This reverts commit 5d0f4dc729
to correct signed-off info.
This commit is contained in:
parent
5d0f4dc729
commit
86b7b027b7
|
@ -231,10 +231,8 @@ public abstract class HBaseServerBase<R extends HBaseRpcServicesBase<?>> extends
|
||||||
// init the filesystem
|
// init the filesystem
|
||||||
this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
|
this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
|
||||||
this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
|
this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
|
||||||
int tableDescriptorParallelLoadThreads =
|
|
||||||
conf.getInt("hbase.tabledescriptor.parallel.load.threads", 0);
|
|
||||||
this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
|
this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
|
||||||
!canUpdateTableDescriptor(), cacheTableDescriptor(), tableDescriptorParallelLoadThreads);
|
!canUpdateTableDescriptor(), cacheTableDescriptor());
|
||||||
}
|
}
|
||||||
|
|
||||||
public HBaseServerBase(Configuration conf, String name) throws IOException {
|
public HBaseServerBase(Configuration conf, String name) throws IOException {
|
||||||
|
@ -468,17 +466,6 @@ public abstract class HBaseServerBase<R extends HBaseRpcServicesBase<?>> extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final void closeTableDescriptors() {
|
|
||||||
if (this.tableDescriptors != null) {
|
|
||||||
LOG.info("Close table descriptors");
|
|
||||||
try {
|
|
||||||
this.tableDescriptors.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.debug("Failed to close table descriptors gracefully", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In order to register ShutdownHook, this method is called when HMaster and HRegionServer are
|
* In order to register ShutdownHook, this method is called when HMaster and HRegionServer are
|
||||||
* started. For details, please refer to HBASE-26951
|
* started. For details, please refer to HBASE-26951
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
@ -27,7 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
* Get, remove and modify table descriptors.
|
* Get, remove and modify table descriptors.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface TableDescriptors extends Closeable {
|
public interface TableDescriptors {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether a given table exists, i.e, has a table descriptor.
|
* Test whether a given table exists, i.e, has a table descriptor.
|
||||||
|
@ -36,11 +35,6 @@ public interface TableDescriptors extends Closeable {
|
||||||
return get(tableName) != null;
|
return get(tableName) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
default void close() throws IOException {
|
|
||||||
// do nothing by default
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return TableDescriptor for tablename
|
* @return TableDescriptor for tablename
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -601,7 +601,6 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
|
||||||
this.rpcServices.stop();
|
this.rpcServices.stop();
|
||||||
}
|
}
|
||||||
closeZooKeeper();
|
closeZooKeeper();
|
||||||
closeTableDescriptors();
|
|
||||||
span.setStatus(StatusCode.OK);
|
span.setStatus(StatusCode.OK);
|
||||||
} finally {
|
} finally {
|
||||||
span.end();
|
span.end();
|
||||||
|
|
|
@ -977,7 +977,6 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
||||||
ZNodeClearer.deleteMyEphemeralNodeOnDisk();
|
ZNodeClearer.deleteMyEphemeralNodeOnDisk();
|
||||||
|
|
||||||
closeZooKeeper();
|
closeZooKeeper();
|
||||||
closeTableDescriptors();
|
|
||||||
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
|
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
|
||||||
span.setStatus(StatusCode.OK);
|
span.setStatus(StatusCode.OK);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -21,7 +21,6 @@ import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -29,12 +28,6 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import org.apache.commons.lang3.NotImplementedException;
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -62,7 +55,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
|
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It
|
* Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It
|
||||||
|
@ -87,8 +79,6 @@ public class FSTableDescriptors implements TableDescriptors {
|
||||||
private final boolean fsreadonly;
|
private final boolean fsreadonly;
|
||||||
private final boolean usecache;
|
private final boolean usecache;
|
||||||
private volatile boolean fsvisited;
|
private volatile boolean fsvisited;
|
||||||
private boolean tableDescriptorParallelLoadEnable = false;
|
|
||||||
private ThreadPoolExecutor executor;
|
|
||||||
|
|
||||||
long cachehits = 0;
|
long cachehits = 0;
|
||||||
long invocations = 0;
|
long invocations = 0;
|
||||||
|
@ -118,23 +108,10 @@ public class FSTableDescriptors implements TableDescriptors {
|
||||||
|
|
||||||
public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
|
public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
|
||||||
final boolean usecache) {
|
final boolean usecache) {
|
||||||
this(fs, rootdir, fsreadonly, usecache, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
|
|
||||||
final boolean usecache, final int tableDescriptorParallelLoadThreads) {
|
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.rootdir = rootdir;
|
this.rootdir = rootdir;
|
||||||
this.fsreadonly = fsreadonly;
|
this.fsreadonly = fsreadonly;
|
||||||
this.usecache = usecache;
|
this.usecache = usecache;
|
||||||
if (tableDescriptorParallelLoadThreads > 0) {
|
|
||||||
tableDescriptorParallelLoadEnable = true;
|
|
||||||
executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads,
|
|
||||||
tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
|
|
||||||
new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true)
|
|
||||||
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
|
||||||
executor.allowCoreThreadTimeOut(true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException {
|
public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException {
|
||||||
|
@ -258,56 +235,27 @@ public class FSTableDescriptors implements TableDescriptors {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Map<String, TableDescriptor> getAll() throws IOException {
|
public Map<String, TableDescriptor> getAll() throws IOException {
|
||||||
Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>();
|
Map<String, TableDescriptor> tds = new TreeMap<>();
|
||||||
if (fsvisited) {
|
if (fsvisited) {
|
||||||
for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) {
|
for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) {
|
||||||
tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue());
|
tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Fetching table descriptors from the filesystem.");
|
LOG.trace("Fetching table descriptors from the filesystem.");
|
||||||
final long startTime = EnvironmentEdgeManager.currentTime();
|
boolean allvisited = usecache;
|
||||||
AtomicBoolean allvisited = new AtomicBoolean(usecache);
|
for (Path d : FSUtils.getTableDirs(fs, rootdir)) {
|
||||||
List<Path> tableDirs = FSUtils.getTableDirs(fs, rootdir);
|
TableDescriptor htd = get(CommonFSUtils.getTableName(d));
|
||||||
if (!tableDescriptorParallelLoadEnable) {
|
if (htd == null) {
|
||||||
for (Path dir : tableDirs) {
|
allvisited = false;
|
||||||
internalGet(dir, tds, allvisited);
|
} else {
|
||||||
}
|
tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
|
||||||
} else {
|
|
||||||
CountDownLatch latch = new CountDownLatch(tableDirs.size());
|
|
||||||
for (Path dir : tableDirs) {
|
|
||||||
executor.submit(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
internalGet(dir, tds, allvisited);
|
|
||||||
} finally {
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
latch.await();
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fsvisited = allvisited.get();
|
fsvisited = allvisited;
|
||||||
LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost "
|
|
||||||
+ (EnvironmentEdgeManager.currentTime() - startTime) + "ms.");
|
|
||||||
}
|
}
|
||||||
return tds;
|
return tds;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void internalGet(Path dir, Map<String, TableDescriptor> tds, AtomicBoolean allvisited) {
|
|
||||||
TableDescriptor htd = get(CommonFSUtils.getTableName(dir));
|
|
||||||
if (htd == null) {
|
|
||||||
allvisited.set(false);
|
|
||||||
} else {
|
|
||||||
tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find descriptors by namespace.
|
* Find descriptors by namespace.
|
||||||
* @see #get(org.apache.hadoop.hbase.TableName)
|
* @see #get(org.apache.hadoop.hbase.TableName)
|
||||||
|
@ -431,14 +379,6 @@ public class FSTableDescriptors implements TableDescriptors {
|
||||||
return Bytes.toString(b);
|
return Bytes.toString(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
// Close the executor when parallel loading enabled.
|
|
||||||
if (tableDescriptorParallelLoadEnable) {
|
|
||||||
this.executor.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static final class SequenceIdAndFileLength {
|
static final class SequenceIdAndFileLength {
|
||||||
|
|
||||||
final int sequenceId;
|
final int sequenceId;
|
||||||
|
|
|
@ -285,32 +285,6 @@ public class TestFSTableDescriptors {
|
||||||
+ htds.getAll().size(), count + 1, htds.getAll().size());
|
+ htds.getAll().size(), count + 1, htds.getAll().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParallelGetAll() throws IOException, InterruptedException {
|
|
||||||
final String name = "testParallelGetAll";
|
|
||||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
|
||||||
// Enable parallel load table descriptor.
|
|
||||||
FSTableDescriptors htds = new FSTableDescriptorsTest(fs, testDir, true, 20);
|
|
||||||
final int count = 100;
|
|
||||||
// Write out table infos.
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
htds.createTableDescriptor(
|
|
||||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(name + i)).build());
|
|
||||||
}
|
|
||||||
// add hbase:meta
|
|
||||||
htds
|
|
||||||
.createTableDescriptor(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME).build());
|
|
||||||
|
|
||||||
int getTableDescriptorSize = htds.getAll().size();
|
|
||||||
assertEquals("getAll() didn't return all TableDescriptors, expected: " + (count + 1) + " got: "
|
|
||||||
+ getTableDescriptorSize, count + 1, getTableDescriptorSize);
|
|
||||||
|
|
||||||
// get again to check whether the cache works well
|
|
||||||
getTableDescriptorSize = htds.getAll().size();
|
|
||||||
assertEquals("getAll() didn't return all TableDescriptors with cache, expected: " + (count + 1)
|
|
||||||
+ " got: " + getTableDescriptorSize, count + 1, getTableDescriptorSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAllOrdering() throws Exception {
|
public void testGetAllOrdering() throws Exception {
|
||||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
|
@ -493,11 +467,6 @@ public class TestFSTableDescriptors {
|
||||||
super(fs, rootdir, false, usecache);
|
super(fs, rootdir, false, usecache);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FSTableDescriptorsTest(FileSystem fs, Path rootdir, boolean usecache,
|
|
||||||
int tableDescriptorParallelLoadThreads) {
|
|
||||||
super(fs, rootdir, false, usecache, tableDescriptorParallelLoadThreads);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TableDescriptor get(TableName tablename) {
|
public TableDescriptor get(TableName tablename) {
|
||||||
LOG.info((super.isUsecache() ? "Cached" : "Non-Cached") + " TableDescriptor.get() on "
|
LOG.info((super.isUsecache() ? "Cached" : "Non-Cached") + " TableDescriptor.get() on "
|
||||||
|
|
Loading…
Reference in New Issue