HBASE-24616 Remove BoundedRecoveredHFilesOutputSink dependency on a TableDescriptor (#1955)

Purge query Master for table descriptors; make do w/ generic options.

Logging cleanup.

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
 Undo fetching Table Descriptor. Not reliably available at recovery time.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Michael Stack 2020-06-25 11:45:06 -07:00 committed by stack
parent 1378776a91
commit e54e3afc5c
8 changed files with 63 additions and 92 deletions

View File

@ -258,7 +258,7 @@ public class ServerCrashProcedure
try { try {
splitWALManager.deleteWALDir(serverName); splitWALManager.deleteWALDir(serverName);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("remove WAL directory of server {} failed, ignore...", serverName, e); LOG.warn("Remove WAL directory of server {} failed, ignore...", serverName, e);
} }
} }

View File

@ -2056,8 +2056,7 @@ public class HRegionServer extends Thread implements
// SplitLogWorker needs csm. If none, don't start this. // SplitLogWorker needs csm. If none, don't start this.
this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
splitLogWorker.start(); splitLogWorker.start();
} else { LOG.debug("SplitLogWorker started");
LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
} }
// Memstore services. // Memstore services.
@ -2281,7 +2280,7 @@ public class HRegionServer extends Thread implements
long openProcId = context.getOpenProcId(); long openProcId = context.getOpenProcId();
long masterSystemTime = context.getMasterSystemTime(); long masterSystemTime = context.getMasterSystemTime();
rpcServices.checkOpen(); rpcServices.checkOpen();
LOG.info("Post open deploy tasks for {}, openProcId={}, masterSystemTime={}", LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
// Do checks to see if we need to compact (references or too many files) // Do checks to see if we need to compact (references or too many files)
for (HStore s : r.stores.values()) { for (HStore s : r.stores.values()) {

View File

@ -435,7 +435,7 @@ public class HStoreFile implements StoreFile {
if (cfBloomType != BloomType.NONE) { if (cfBloomType != BloomType.NONE) {
initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META); initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
if (hfileBloomType != cfBloomType) { if (hfileBloomType != cfBloomType) {
LOG.info("HFile Bloom filter type for " LOG.debug("HFile Bloom filter type for "
+ initialReader.getHFileReader().getName() + ": " + hfileBloomType + initialReader.getHFileReader().getName() + ": " + hfileBloomType
+ ", but " + cfBloomType + " specified in column family " + ", but " + cfBloomType + " specified in column family "
+ "configuration"); + "configuration");

View File

@ -74,8 +74,8 @@ public class SplitLogWorker implements Runnable {
Thread worker; Thread worker;
// thread pool which executes recovery work // thread pool which executes recovery work
private SplitLogWorkerCoordination coordination; private final SplitLogWorkerCoordination coordination;
private RegionServerServices server; private final RegionServerServices server;
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) { TaskExecutor splitTaskExecutor) {
@ -152,7 +152,10 @@ public class SplitLogWorker implements Runnable {
return true; return true;
} }
static Status splitLog(String name, CancelableProgressable p, Configuration conf, /**
* @return Result either DONE, RESIGNED, or ERR.
*/
static Status splitLog(String filename, CancelableProgressable p, Configuration conf,
RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) { RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
Path walDir; Path walDir;
FileSystem fs; FileSystem fs;
@ -164,11 +167,11 @@ public class SplitLogWorker implements Runnable {
return Status.RESIGNED; return Status.RESIGNED;
} }
try { try {
if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) { if (!processSyncReplicationWAL(filename, conf, server, fs, walDir)) {
return Status.DONE; return Status.DONE;
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("failed to process sync replication wal {}", name, e); LOG.warn("failed to process sync replication wal {}", filename, e);
return Status.RESIGNED; return Status.RESIGNED;
} }
// TODO have to correctly figure out when log splitting has been // TODO have to correctly figure out when log splitting has been
@ -178,31 +181,32 @@ public class SplitLogWorker implements Runnable {
SplitLogWorkerCoordination splitLogWorkerCoordination = SplitLogWorkerCoordination splitLogWorkerCoordination =
server.getCoordinatedStateManager() == null ? null server.getCoordinatedStateManager() == null ? null
: server.getCoordinatedStateManager().getSplitLogWorkerCoordination(); : server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, p, if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, p,
sequenceIdChecker, splitLogWorkerCoordination, factory, server)) { sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
return Status.PREEMPTED; return Status.PREEMPTED;
} }
} catch (InterruptedIOException iioe) { } catch (InterruptedIOException iioe) {
LOG.warn("Resigning, interrupted splitting WAL {}", name, iioe); LOG.warn("Resigning, interrupted splitting WAL {}", filename, iioe);
return Status.RESIGNED; return Status.RESIGNED;
} catch (IOException e) { } catch (IOException e) {
if (e instanceof FileNotFoundException) { if (e instanceof FileNotFoundException) {
// A wal file may not exist anymore. Nothing can be recovered so move on // A wal file may not exist anymore. Nothing can be recovered so move on
LOG.warn("Done, WAL {} does not exist anymore", name, e); LOG.warn("Done, WAL {} does not exist anymore", filename, e);
return Status.DONE; return Status.DONE;
} }
Throwable cause = e.getCause(); Throwable cause = e.getCause();
if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
|| cause instanceof ConnectException || cause instanceof SocketTimeoutException)) { || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
LOG.warn("Resigning, can't connect to target regionserver splitting WAL {}", name, e); LOG.warn("Resigning, can't connect to target regionserver splitting WAL {}", filename, e);
return Status.RESIGNED; return Status.RESIGNED;
} else if (cause instanceof InterruptedException) { } else if (cause instanceof InterruptedException) {
LOG.warn("Resigning, interrupted splitting WAL {}", name, e); LOG.warn("Resigning, interrupted splitting WAL {}", filename, e);
return Status.RESIGNED; return Status.RESIGNED;
} }
LOG.warn("Error splitting WAL {}", name, e); LOG.warn("Error splitting WAL {}", filename, e);
return Status.ERR; return Status.ERR;
} }
LOG.debug("Done splitting WAL {}", filename);
return Status.DONE; return Status.DONE;
} }

View File

@ -1,5 +1,4 @@
/** /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -17,17 +16,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@ -75,6 +71,24 @@ public class SplitWALCallable implements RSProcedureCallable {
return EventType.RS_LOG_REPLAY; return EventType.RS_LOG_REPLAY;
} }
public static class PreemptedWALSplitException extends HBaseIOException {
PreemptedWALSplitException(String wal) {
super(wal);
}
}
public static class ResignedWALSplitException extends HBaseIOException {
ResignedWALSplitException(String wal) {
super(wal);
}
}
public static class ErrorWALSplitException extends HBaseIOException {
ErrorWALSplitException(String wal) {
super(wal);
}
}
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
if (initError != null) { if (initError != null) {
@ -82,14 +96,18 @@ public class SplitWALCallable implements RSProcedureCallable {
} }
//grab a lock //grab a lock
splitWALLock = splitWALLocks.acquireLock(walPath); splitWALLock = splitWALLocks.acquireLock(walPath);
try{ try {
splitWal(); switch (SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.getWalFactory())) {
LOG.info("Successful split of {}", walPath); case DONE:
} catch (IOException e){ break;
LOG.warn("Failed split of {}.", walPath, e); case PREEMPTED:
throw e; throw new PreemptedWALSplitException(this.walPath);
} case RESIGNED:
finally { throw new ResignedWALSplitException(this.walPath);
default:
throw new ErrorWALSplitException(this.walPath);
}
} finally {
splitWALLock.unlock(); splitWALLock.unlock();
} }
return null; return null;
@ -98,12 +116,4 @@ public class SplitWALCallable implements RSProcedureCallable {
public String getWalPath() { public String getWalPath() {
return this.walPath; return this.walPath;
} }
private void splitWal() throws IOException {
SplitLogWorker.TaskExecutor.Status status =
SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.getWalFactory());
if (status != SplitLogWorker.TaskExecutor.Status.DONE) {
throw new IOException("Failed WAL split, status=" + status + ", wal=" + walPath);
}
}
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -48,7 +48,7 @@ public class RSProcedureHandler extends EventHandler {
try { try {
callable.call(); callable.call();
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Error when call RSProcedureCallable: ", t); LOG.error("pid=" + this.procId, t);
error = t; error = t;
} finally { } finally {
((HRegionServer) server).remoteProcedureComplete(procId, error); ((HRegionServer) server).remoteProcedureComplete(procId, error);

View File

@ -34,8 +34,6 @@ import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
@ -67,13 +65,10 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
// Need a counter to track the opening writers. // Need a counter to track the opening writers.
private final AtomicInteger openingWritersNum = new AtomicInteger(0); private final AtomicInteger openingWritersNum = new AtomicInteger(0);
private final ConcurrentMap<TableName, TableDescriptor> tableDescCache;
public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter,
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
super(controller, entryBuffers, numWriters); super(controller, entryBuffers, numWriters);
this.walSplitter = walSplitter; this.walSplitter = walSplitter;
this.tableDescCache = new ConcurrentHashMap<>();
} }
@Override @Override
@ -191,6 +186,10 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
return false; return false;
} }
/**
* @return Returns a base HFile without compressions or encodings; good enough for recovery
* given hfile has metadata on how it was written.
*/
private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName, private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
long seqId, String familyName, boolean isMetaTable) throws IOException { long seqId, String familyName, boolean isMetaTable) throws IOException {
Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf, Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf,
@ -198,43 +197,11 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
StoreFileWriter.Builder writerBuilder = StoreFileWriter.Builder writerBuilder =
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS) new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
.withOutputDir(outputDir); .withOutputDir(outputDir);
HFileContext hFileContext = new HFileContextBuilder().
TableDescriptor tableDesc = withChecksumType(HStore.getChecksumType(walSplitter.conf)).
tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t)); withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf)).
if (tableDesc == null) { withCellComparator(isMetaTable?
throw new IOException("Failed to get table descriptor for table " + tableName); CellComparatorImpl.META_COMPARATOR: CellComparatorImpl.COMPARATOR).build();
} return writerBuilder.withFileContext(hFileContext).build();
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
HFileContext hFileContext = createFileContext(cfd, isMetaTable);
return writerBuilder.withFileContext(hFileContext).withBloomType(cfd.getBloomFilterType())
.build();
}
private HFileContext createFileContext(ColumnFamilyDescriptor cfd, boolean isMetaTable)
throws IOException {
return new HFileContextBuilder().withCompression(cfd.getCompressionType())
.withChecksumType(HStore.getChecksumType(walSplitter.conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf))
.withBlockSize(cfd.getBlocksize()).withCompressTags(cfd.isCompressTags())
.withDataBlockEncoding(cfd.getDataBlockEncoding()).withCellComparator(
isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR)
.build();
}
private TableDescriptor getTableDescriptor(TableName tableName) {
if (walSplitter.rsServices != null) {
try {
return walSplitter.rsServices.getConnection().getAdmin().getDescriptor(tableName);
} catch (IOException e) {
LOG.warn("Failed to get table descriptor for {}", tableName, e);
}
}
LOG.info("Failed getting {} table descriptor from master; trying local", tableName);
try {
return walSplitter.tableDescriptors.get(tableName);
} catch (IOException e) {
LOG.warn("Failed to get table descriptor for {}", tableName, e);
return null;
}
} }
} }

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Reader;
@ -88,7 +86,6 @@ public class WALSplitter {
final Path rootDir; final Path rootDir;
final FileSystem rootFS; final FileSystem rootFS;
final RegionServerServices rsServices; final RegionServerServices rsServices;
final TableDescriptors tableDescriptors;
// Major subcomponents of the split process. // Major subcomponents of the split process.
// These are separated into inner classes to make testing easier. // These are separated into inner classes to make testing easier.
@ -152,12 +149,6 @@ public class WALSplitter {
this.sequenceIdChecker = idChecker; this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination; this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.rsServices = rsServices; this.rsServices = rsServices;
if (rsServices != null) {
this.tableDescriptors = rsServices.getTableDescriptors();
} else {
this.tableDescriptors = new FSTableDescriptors(rootFS, rootDir, true, true);
}
this.walFactory = factory; this.walFactory = factory;
PipelineController controller = new PipelineController(); PipelineController controller = new PipelineController();
this.tmpDirName = this.tmpDirName =