HBASE-22454 refactor WALSplitter
This commit is contained in:
parent
c3642b1dda
commit
325bb00d23
|
@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
|
|||
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
|
||||
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
|
@ -92,7 +92,7 @@ public class ZKSplitLogManagerCoordination extends ZKListener implements
|
|||
@Override
|
||||
public Status finish(ServerName workerName, String logfile) {
|
||||
try {
|
||||
WALSplitter.finishSplitLogFile(logfile, conf);
|
||||
WALSplitUtil.finishSplitLogFile(logfile, conf);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not finish splitting of log file " + logfile, e);
|
||||
return Status.ERR;
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
|||
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
@ -229,7 +229,7 @@ final class AssignmentManagerUtil {
|
|||
}
|
||||
|
||||
static void checkClosedRegion(MasterProcedureEnv env, RegionInfo regionInfo) throws IOException {
|
||||
if (WALSplitter.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) {
|
||||
if (WALSplitUtil.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) {
|
||||
throw new IOException("Recovered.edits are found in Region: " + regionInfo +
|
||||
", abort split/merge to prevent data loss");
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -777,11 +777,11 @@ public class MergeTableRegionsProcedure
|
|||
long maxSequenceId = -1L;
|
||||
for (RegionInfo region : regionsToMerge) {
|
||||
maxSequenceId =
|
||||
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
|
||||
Math.max(maxSequenceId, WALSplitUtil.getMaxRegionSequenceId(
|
||||
walFS, getWALRegionDir(env, region)));
|
||||
}
|
||||
if (maxSequenceId > 0) {
|
||||
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
|
||||
WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
|
||||
maxSequenceId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -219,7 +219,7 @@ public class RegionStateStore {
|
|||
private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
|
||||
FileSystem walFS = master.getMasterWalManager().getFileSystem();
|
||||
long maxSeqId =
|
||||
WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
|
||||
WALSplitUtil.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
|
||||
master.getConfiguration(), region.getTable(), region.getEncodedName()));
|
||||
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -876,11 +876,11 @@ public class SplitTableRegionProcedure
|
|||
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
|
||||
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
|
||||
long maxSequenceId =
|
||||
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
|
||||
WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
|
||||
if (maxSequenceId > 0) {
|
||||
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI),
|
||||
WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI),
|
||||
maxSequenceId);
|
||||
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI),
|
||||
WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI),
|
||||
maxSequenceId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
|||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -117,7 +117,7 @@ public class DisableTableProcedure
|
|||
for (RegionInfo region : env.getAssignmentManager().getRegionStates()
|
||||
.getRegionsOfTable(tableName)) {
|
||||
long maxSequenceId =
|
||||
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
|
||||
WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
|
||||
long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
|
||||
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
|
||||
EnvironmentEdgeManager.currentTime()));
|
||||
|
|
|
@ -175,8 +175,8 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
|||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
|
@ -1015,15 +1015,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// (particularly if no recovered edits, seqid will be -1).
|
||||
long nextSeqId = maxSeqId + 1;
|
||||
if (!isRestoredRegion) {
|
||||
long maxSeqIdFromFile =
|
||||
WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica());
|
||||
long maxSeqIdFromFile = WALSplitUtil.getMaxRegionSequenceId(getWalFileSystem(),
|
||||
getWALRegionDirOfDefaultReplica());
|
||||
nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
|
||||
// The openSeqNum will always be increase even for read only region, as we rely on it to
|
||||
// determine whether a region has been successfully reopend, so here we always need to update
|
||||
// the max sequence id file.
|
||||
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
||||
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
|
||||
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
|
||||
WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
|
||||
nextSeqId - 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1181,7 +1182,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// checking region folder exists is due to many tests which delete the table folder while a
|
||||
// table is still online
|
||||
if (getWalFileSystem().exists(getWALRegionDir())) {
|
||||
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
|
||||
WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
|
||||
mvcc.getReadPoint());
|
||||
}
|
||||
}
|
||||
|
@ -4640,13 +4641,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
NavigableSet<Path> filesUnderRootDir = null;
|
||||
if (!regionDir.equals(defaultRegionDir)) {
|
||||
filesUnderRootDir =
|
||||
WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir);
|
||||
WALSplitUtil.getSplitEditFilesSorted(rootFS, defaultRegionDir);
|
||||
seqid = Math.max(seqid,
|
||||
replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter,
|
||||
defaultRegionDir));
|
||||
}
|
||||
|
||||
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
|
||||
NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionDir);
|
||||
seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
|
||||
files, reporter, regionDir));
|
||||
|
||||
|
@ -4659,7 +4660,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// For debugging data loss issues!
|
||||
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
|
||||
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
|
||||
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
|
||||
String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir).getName();
|
||||
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
|
||||
for (Path file: files) {
|
||||
fakeStoreFiles.add(
|
||||
|
@ -4736,7 +4737,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
|
||||
}
|
||||
if (skipErrors) {
|
||||
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
|
||||
Path p = WALSplitUtil.moveAsideBadEditsFile(fs, edits);
|
||||
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
|
||||
+ "=true so continuing. Renamed " + edits +
|
||||
" as " + p, e);
|
||||
|
@ -4916,7 +4917,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
|
||||
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
|
||||
msg = "EnLongAddered EOF. Most likely due to Master failure during " +
|
||||
"wal splitting, so we have this data in another edit. " +
|
||||
"Continuing, but renaming " + edits + " as " + p;
|
||||
|
@ -4926,7 +4927,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// If the IOE resulted from bad file format,
|
||||
// then this problem is idempotent and retrying won't help
|
||||
if (ioe.getCause() instanceof ParseException) {
|
||||
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
|
||||
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
|
||||
msg = "File corruption enLongAddered! " +
|
||||
"Continuing, but renaming " + edits + " as " + p;
|
||||
LOG.warn(msg, ioe);
|
||||
|
|
|
@ -143,7 +143,8 @@ import org.apache.hadoop.hbase.util.Strings;
|
|||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -1112,14 +1113,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
* @throws IOException
|
||||
*/
|
||||
private OperationStatus [] doReplayBatchOp(final HRegion region,
|
||||
final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
|
||||
final List<MutationReplay> mutations, long replaySeqId) throws IOException {
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
boolean batchContainsPuts = false, batchContainsDelete = false;
|
||||
try {
|
||||
for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
|
||||
WALSplitter.MutationReplay m = it.next();
|
||||
for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
|
||||
MutationReplay m = it.next();
|
||||
|
||||
if (m.type == MutationType.PUT) {
|
||||
if (m.getType() == MutationType.PUT) {
|
||||
batchContainsPuts = true;
|
||||
} else {
|
||||
batchContainsDelete = true;
|
||||
|
@ -1163,7 +1164,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
return region.batchReplay(mutations.toArray(
|
||||
new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
|
||||
new MutationReplay[mutations.size()]), replaySeqId);
|
||||
} finally {
|
||||
updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
|
||||
}
|
||||
|
@ -2220,7 +2221,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
entry.getKey().getWriteTime());
|
||||
}
|
||||
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
|
||||
List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
|
||||
List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry,
|
||||
cells, walEntry, durability);
|
||||
if (coprocessorHost != null) {
|
||||
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
|
||||
|
|
|
@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.EntryBuffers;
|
||||
import org.apache.hadoop.hbase.wal.OutputSink;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
|
||||
|
|
|
@ -133,7 +133,7 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
|
|||
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
|
@ -4462,7 +4462,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
// This is special case if a region is left after split
|
||||
he.hdfsOnlyEdits = true;
|
||||
FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
|
||||
Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath());
|
||||
Path ePath = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir.getPath());
|
||||
for (FileStatus subDir : subDirs) {
|
||||
errors.progress();
|
||||
String sdName = subDir.getPath().getName();
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Class that manages the output streams from the log splitting process.
|
||||
* Bounded means the output streams will be no more than the size of threadpool
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class);
|
||||
|
||||
private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
|
||||
|
||||
public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter,
|
||||
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
||||
super(walSplitter, controller, entryBuffers, numWriters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> finishWritingAndClose() throws IOException {
|
||||
boolean isSuccessful;
|
||||
List<Path> result;
|
||||
try {
|
||||
isSuccessful = finishWriting(false);
|
||||
} finally {
|
||||
result = close();
|
||||
}
|
||||
if (isSuccessful) {
|
||||
splits = result;
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
|
||||
List<Path> paths) throws InterruptedException, ExecutionException {
|
||||
for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers
|
||||
.entrySet()) {
|
||||
LOG.info("Submitting writeThenClose of {}",
|
||||
Arrays.toString(buffer.getValue().encodedRegionName));
|
||||
completionService.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
Path dst = writeThenClose(buffer.getValue());
|
||||
paths.add(dst);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
boolean progress_failed = false;
|
||||
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
|
||||
Future<Void> future = completionService.take();
|
||||
future.get();
|
||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||
progress_failed = true;
|
||||
}
|
||||
}
|
||||
|
||||
return progress_failed;
|
||||
}
|
||||
|
||||
/**
|
||||
* since the splitting process may create multiple output files, we need a map
|
||||
* regionRecoverStatMap to track the output count of each region.
|
||||
* @return a map from encoded region ID to the number of edits written out for that region.
|
||||
*/
|
||||
@Override
|
||||
public Map<byte[], Long> getOutputCounts() {
|
||||
Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
|
||||
for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) {
|
||||
regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
|
||||
}
|
||||
return regionRecoverStatMapResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of recovered regions
|
||||
*/
|
||||
@Override
|
||||
public int getNumberOfRecoveredRegions() {
|
||||
return regionRecoverStatMap.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Append the buffer to a new recovered edits file, then close it after all done
|
||||
* @param buffer contain all entries of a certain region
|
||||
* @throws IOException when closeWriter failed
|
||||
*/
|
||||
@Override
|
||||
public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
||||
writeThenClose(buffer);
|
||||
}
|
||||
|
||||
private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
||||
WALSplitter.WriterAndPath wap = appendBuffer(buffer, false);
|
||||
if (wap != null) {
|
||||
String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
|
||||
Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
|
||||
if (value != null) {
|
||||
Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
|
||||
regionRecoverStatMap.put(encodedRegionName, newValue);
|
||||
}
|
||||
}
|
||||
|
||||
Path dst = null;
|
||||
List<IOException> thrown = new ArrayList<>();
|
||||
if (wap != null) {
|
||||
dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
|
||||
}
|
||||
if (!thrown.isEmpty()) {
|
||||
throw MultipleIOException.createIOException(thrown);
|
||||
}
|
||||
return dst;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Class which accumulates edits and separates them into a buffer per region while simultaneously
|
||||
* accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
|
||||
* pull region-specific buffers from this class.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class EntryBuffers {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
|
||||
|
||||
PipelineController controller;
|
||||
|
||||
Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/*
|
||||
* Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
|
||||
* up bytes from a region if we're already writing data for that region in a different IO thread.
|
||||
*/
|
||||
Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
long totalBuffered = 0;
|
||||
long maxHeapUsage;
|
||||
boolean splitWriterCreationBounded;
|
||||
|
||||
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
|
||||
this(controller, maxHeapUsage, false);
|
||||
}
|
||||
|
||||
public EntryBuffers(PipelineController controller, long maxHeapUsage,
|
||||
boolean splitWriterCreationBounded) {
|
||||
this.controller = controller;
|
||||
this.maxHeapUsage = maxHeapUsage;
|
||||
this.splitWriterCreationBounded = splitWriterCreationBounded;
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
|
||||
* crossed the specified threshold.
|
||||
*/
|
||||
public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
|
||||
WALKey key = entry.getKey();
|
||||
RegionEntryBuffer buffer;
|
||||
long incrHeap;
|
||||
synchronized (this) {
|
||||
buffer = buffers.get(key.getEncodedRegionName());
|
||||
if (buffer == null) {
|
||||
buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
|
||||
buffers.put(key.getEncodedRegionName(), buffer);
|
||||
}
|
||||
incrHeap = buffer.appendEntry(entry);
|
||||
}
|
||||
|
||||
// If we crossed the chunk threshold, wait for more space to be available
|
||||
synchronized (controller.dataAvailable) {
|
||||
totalBuffered += incrHeap;
|
||||
while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
|
||||
LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
|
||||
controller.dataAvailable.wait(2000);
|
||||
}
|
||||
controller.dataAvailable.notifyAll();
|
||||
}
|
||||
controller.checkForErrors();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return RegionEntryBuffer a buffer of edits to be written.
|
||||
*/
|
||||
synchronized RegionEntryBuffer getChunkToWrite() {
|
||||
// The core part of limiting opening writers is it doesn't return chunk only if the
|
||||
// heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
|
||||
// region during splitting. It will flush all the logs in the buffer after splitting
|
||||
// through a threadpool, which means the number of writers it created is under control.
|
||||
if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
|
||||
return null;
|
||||
}
|
||||
long biggestSize = 0;
|
||||
byte[] biggestBufferKey = null;
|
||||
|
||||
for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
|
||||
long size = entry.getValue().heapSize();
|
||||
if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
|
||||
biggestSize = size;
|
||||
biggestBufferKey = entry.getKey();
|
||||
}
|
||||
}
|
||||
if (biggestBufferKey == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
|
||||
currentlyWriting.add(biggestBufferKey);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
void doneWriting(RegionEntryBuffer buffer) {
|
||||
synchronized (this) {
|
||||
boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
|
||||
assert removed;
|
||||
}
|
||||
long size = buffer.heapSize();
|
||||
|
||||
synchronized (controller.dataAvailable) {
|
||||
totalBuffered -= size;
|
||||
// We may unblock writers
|
||||
controller.dataAvailable.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
|
||||
return currentlyWriting.contains(region);
|
||||
}
|
||||
|
||||
public void waitUntilDrained() {
|
||||
synchronized (controller.dataAvailable) {
|
||||
while (totalBuffered > 0) {
|
||||
try {
|
||||
controller.dataAvailable.wait(2000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
|
||||
Thread.interrupted();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,460 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
|
||||
import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
|
||||
|
||||
/**
|
||||
* Class that manages the output streams from the log splitting process.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class LogRecoveredEditsOutputSink extends OutputSink {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class);
|
||||
private WALSplitter walSplitter;
|
||||
private FileSystem walFS;
|
||||
private Configuration conf;
|
||||
|
||||
public LogRecoveredEditsOutputSink(WALSplitter walSplitter,
|
||||
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
||||
// More threads could potentially write faster at the expense
|
||||
// of causing more disk seeks as the logs are split.
|
||||
// 3. After a certain setting (probably around 3) the
|
||||
// process will be bound on the reader in the current
|
||||
// implementation anyway.
|
||||
super(controller, entryBuffers, numWriters);
|
||||
this.walSplitter = walSplitter;
|
||||
this.walFS = walSplitter.walFS;
|
||||
this.conf = walSplitter.conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null if failed to report progress
|
||||
*/
|
||||
@Override
|
||||
public List<Path> finishWritingAndClose() throws IOException {
|
||||
boolean isSuccessful = false;
|
||||
List<Path> result = null;
|
||||
try {
|
||||
isSuccessful = finishWriting(false);
|
||||
} finally {
|
||||
result = close();
|
||||
List<IOException> thrown = closeLogWriters(null);
|
||||
if (CollectionUtils.isNotEmpty(thrown)) {
|
||||
throw MultipleIOException.createIOException(thrown);
|
||||
}
|
||||
}
|
||||
if (isSuccessful) {
|
||||
splits = result;
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
// delete the one with fewer wal entries
|
||||
private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst)
|
||||
throws IOException {
|
||||
long dstMinLogSeqNum = -1L;
|
||||
try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
|
||||
WAL.Entry entry = reader.next();
|
||||
if (entry != null) {
|
||||
dstMinLogSeqNum = entry.getKey().getSequenceId();
|
||||
}
|
||||
} catch (EOFException e) {
|
||||
LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
|
||||
e);
|
||||
}
|
||||
if (wap.minLogSeqNum < dstMinLogSeqNum) {
|
||||
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
|
||||
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
|
||||
+ walFS.getFileStatus(dst).getLen());
|
||||
if (!walFS.delete(dst, false)) {
|
||||
LOG.warn("Failed deleting of old {}", dst);
|
||||
throw new IOException("Failed deleting of old " + dst);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path
|
||||
+ ", length=" + walFS.getFileStatus(wap.path).getLen());
|
||||
if (!walFS.delete(wap.path, false)) {
|
||||
LOG.warn("Failed deleting of {}", wap.path);
|
||||
throw new IOException("Failed deleting of " + wap.path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all of the output streams.
|
||||
* @return the list of paths written.
|
||||
*/
|
||||
List<Path> close() throws IOException {
|
||||
Preconditions.checkState(!closeAndCleanCompleted);
|
||||
|
||||
final List<Path> paths = new ArrayList<>();
|
||||
final List<IOException> thrown = Lists.newArrayList();
|
||||
ThreadPoolExecutor closeThreadPool =
|
||||
Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
|
||||
private int count = 1;
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r, "split-log-closeStream-" + count++);
|
||||
return t;
|
||||
}
|
||||
});
|
||||
CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
|
||||
boolean progress_failed;
|
||||
try {
|
||||
progress_failed = executeCloseTask(completionService, thrown, paths);
|
||||
} catch (InterruptedException e) {
|
||||
IOException iie = new InterruptedIOException();
|
||||
iie.initCause(e);
|
||||
throw iie;
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} finally {
|
||||
closeThreadPool.shutdownNow();
|
||||
}
|
||||
if (!thrown.isEmpty()) {
|
||||
throw MultipleIOException.createIOException(thrown);
|
||||
}
|
||||
writersClosed = true;
|
||||
closeAndCleanCompleted = true;
|
||||
if (progress_failed) {
|
||||
return null;
|
||||
}
|
||||
return paths;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param completionService threadPool to execute the closing tasks
|
||||
* @param thrown store the exceptions
|
||||
* @param paths arrayList to store the paths written
|
||||
* @return if close tasks executed successful
|
||||
*/
|
||||
boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
|
||||
List<Path> paths) throws InterruptedException, ExecutionException {
|
||||
for (final Map.Entry<String, WALSplitter.SinkWriter> writersEntry : writers.entrySet()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path);
|
||||
}
|
||||
completionService.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue();
|
||||
Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
|
||||
paths.add(dst);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
boolean progress_failed = false;
|
||||
for (int i = 0, n = this.writers.size(); i < n; i++) {
|
||||
Future<Void> future = completionService.take();
|
||||
future.get();
|
||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||
progress_failed = true;
|
||||
}
|
||||
}
|
||||
return progress_failed;
|
||||
}
|
||||
|
||||
Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap,
|
||||
List<IOException> thrown) throws IOException {
|
||||
LOG.trace("Closing {}", wap.path);
|
||||
try {
|
||||
wap.writer.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not close log at {}", wap.path, ioe);
|
||||
thrown.add(ioe);
|
||||
return null;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped "
|
||||
+ wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms");
|
||||
}
|
||||
if (wap.editsWritten == 0) {
|
||||
// just remove the empty recovered.edits file
|
||||
if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) {
|
||||
LOG.warn("Failed deleting empty {}", wap.path);
|
||||
throw new IOException("Failed deleting empty " + wap.path);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
Path dst = getCompletedRecoveredEditsFilePath(wap.path,
|
||||
regionMaximumEditLogSeqNum.get(encodedRegionName));
|
||||
try {
|
||||
if (!dst.equals(wap.path) && walFS.exists(dst)) {
|
||||
deleteOneWithFewerEntries(wap, dst);
|
||||
}
|
||||
// Skip the unit tests which create a splitter that reads and
|
||||
// writes the data without touching disk.
|
||||
// TestHLogSplit#testThreading is an example.
|
||||
if (walFS.exists(wap.path)) {
|
||||
if (!walFS.rename(wap.path, dst)) {
|
||||
throw new IOException("Failed renaming " + wap.path + " to " + dst);
|
||||
}
|
||||
LOG.info("Rename {} to {}", wap.path, dst);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not rename {} to {}", wap.path, dst, ioe);
|
||||
thrown.add(ioe);
|
||||
return null;
|
||||
}
|
||||
return dst;
|
||||
}
|
||||
|
||||
private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
|
||||
if (writersClosed) {
|
||||
return thrown;
|
||||
}
|
||||
if (thrown == null) {
|
||||
thrown = Lists.newArrayList();
|
||||
}
|
||||
try {
|
||||
for (WriterThread writerThread : writerThreads) {
|
||||
while (writerThread.isAlive()) {
|
||||
writerThread.setShouldStop(true);
|
||||
writerThread.interrupt();
|
||||
try {
|
||||
writerThread.join(10);
|
||||
} catch (InterruptedException e) {
|
||||
IOException iie = new InterruptedIOException();
|
||||
iie.initCause(e);
|
||||
throw iie;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
WALSplitter.WriterAndPath wap = null;
|
||||
for (WALSplitter.SinkWriter tmpWAP : writers.values()) {
|
||||
try {
|
||||
wap = (WALSplitter.WriterAndPath) tmpWAP;
|
||||
wap.writer.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't close log at {}", wap.path, ioe);
|
||||
thrown.add(ioe);
|
||||
continue;
|
||||
}
|
||||
LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in "
|
||||
+ (wap.nanosSpent / 1000 / 1000) + "ms)");
|
||||
}
|
||||
writersClosed = true;
|
||||
}
|
||||
|
||||
return thrown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a writer and path for a log starting at the given entry. This function is threadsafe so
|
||||
* long as multiple threads are always acting on different regions.
|
||||
* @return null if this region shouldn't output any logs
|
||||
*/
|
||||
WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
|
||||
byte[] region = entry.getKey().getEncodedRegionName();
|
||||
String regionName = Bytes.toString(region);
|
||||
WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName);
|
||||
if (ret != null) {
|
||||
return ret;
|
||||
}
|
||||
// If we already decided that this region doesn't get any output
|
||||
// we don't need to check again.
|
||||
if (blacklistedRegions.contains(region)) {
|
||||
return null;
|
||||
}
|
||||
ret = createWAP(region, entry);
|
||||
if (ret == null) {
|
||||
blacklistedRegions.add(region);
|
||||
return null;
|
||||
}
|
||||
if (reusable) {
|
||||
writers.put(regionName, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a path with a write for that path. caller should close.
|
||||
*/
|
||||
WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
|
||||
String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
|
||||
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
|
||||
Path regionedits = getRegionSplitEditsPath(entry,
|
||||
walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf);
|
||||
if (regionedits == null) {
|
||||
return null;
|
||||
}
|
||||
FileSystem walFs = FSUtils.getWALFileSystem(conf);
|
||||
if (walFs.exists(regionedits)) {
|
||||
LOG.warn("Found old edits file. It could be the "
|
||||
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
|
||||
+ walFs.getFileStatus(regionedits).getLen());
|
||||
if (!walFs.delete(regionedits, false)) {
|
||||
LOG.warn("Failed delete of old {}", regionedits);
|
||||
}
|
||||
}
|
||||
WALProvider.Writer w = walSplitter.createWriter(regionedits);
|
||||
LOG.debug("Creating writer path={}", regionedits);
|
||||
return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
|
||||
}
|
||||
|
||||
|
||||
|
||||
void filterCellByStore(WAL.Entry logEntry) {
|
||||
Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
|
||||
.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
||||
if (MapUtils.isEmpty(maxSeqIdInStores)) {
|
||||
return;
|
||||
}
|
||||
// Create the array list for the cells that aren't filtered.
|
||||
// We make the assumption that most cells will be kept.
|
||||
ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
|
||||
for (Cell cell : logEntry.getEdit().getCells()) {
|
||||
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
|
||||
keptCells.add(cell);
|
||||
} else {
|
||||
byte[] family = CellUtil.cloneFamily(cell);
|
||||
Long maxSeqId = maxSeqIdInStores.get(family);
|
||||
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
|
||||
// or the master was crashed before and we can not get the information.
|
||||
if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
|
||||
keptCells.add(cell);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Anything in the keptCells array list is still live.
|
||||
// So rather than removing the cells from the array list
|
||||
// which would be an O(n^2) operation, we just replace the list
|
||||
logEntry.getEdit().setCells(keptCells);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
||||
appendBuffer(buffer, true);
|
||||
}
|
||||
|
||||
WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable)
|
||||
throws IOException {
|
||||
List<WAL.Entry> entries = buffer.entryBuffer;
|
||||
if (entries.isEmpty()) {
|
||||
LOG.warn("got an empty buffer, skipping");
|
||||
return null;
|
||||
}
|
||||
|
||||
WALSplitter.WriterAndPath wap = null;
|
||||
|
||||
long startTime = System.nanoTime();
|
||||
try {
|
||||
int editsCount = 0;
|
||||
|
||||
for (WAL.Entry logEntry : entries) {
|
||||
if (wap == null) {
|
||||
wap = getWriterAndPath(logEntry, reusable);
|
||||
if (wap == null) {
|
||||
// This log spews the full edit. Can be massive in the log. Enable only debugging
|
||||
// WAL lost edit issues.
|
||||
LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
filterCellByStore(logEntry);
|
||||
if (!logEntry.getEdit().isEmpty()) {
|
||||
wap.writer.append(logEntry);
|
||||
this.updateRegionMaximumEditLogSeqNum(logEntry);
|
||||
editsCount++;
|
||||
} else {
|
||||
wap.incrementSkippedEdits(1);
|
||||
}
|
||||
}
|
||||
// Pass along summary statistics
|
||||
wap.incrementEdits(editsCount);
|
||||
wap.incrementNanoTime(System.nanoTime() - startTime);
|
||||
} catch (IOException e) {
|
||||
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
||||
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
|
||||
throw e;
|
||||
}
|
||||
return wap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean keepRegionEvent(WAL.Entry entry) {
|
||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||
for (Cell cell : cells) {
|
||||
if (WALEdit.isCompactionMarker(cell)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a map from encoded region ID to the number of edits written out for that region.
|
||||
*/
|
||||
@Override
|
||||
public Map<byte[], Long> getOutputCounts() {
|
||||
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (Map.Entry<String, WALSplitter.SinkWriter> entry : writers.entrySet()) {
|
||||
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfRecoveredRegions() {
|
||||
return writers.size();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,252 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* The following class is an abstraction class to provide a common interface to support different
|
||||
* ways of consuming recovered edits.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class OutputSink {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
|
||||
|
||||
protected WALSplitter.PipelineController controller;
|
||||
protected EntryBuffers entryBuffers;
|
||||
|
||||
protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>();
|
||||
protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
protected final List<WriterThread> writerThreads = Lists.newArrayList();
|
||||
|
||||
/* Set of regions which we've decided should not output edits */
|
||||
protected final Set<byte[]> blacklistedRegions =
|
||||
Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
|
||||
|
||||
protected boolean closeAndCleanCompleted = false;
|
||||
|
||||
protected boolean writersClosed = false;
|
||||
|
||||
protected final int numThreads;
|
||||
|
||||
protected CancelableProgressable reporter = null;
|
||||
|
||||
protected AtomicLong skippedEdits = new AtomicLong();
|
||||
|
||||
protected List<Path> splits = null;
|
||||
|
||||
public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
|
||||
int numWriters) {
|
||||
numThreads = numWriters;
|
||||
this.controller = controller;
|
||||
this.entryBuffers = entryBuffers;
|
||||
}
|
||||
|
||||
void setReporter(CancelableProgressable reporter) {
|
||||
this.reporter = reporter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the threads that will pump data from the entryBuffers to the output files.
|
||||
*/
|
||||
public synchronized void startWriterThreads() {
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
|
||||
t.start();
|
||||
writerThreads.add(t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update region's maximum edit log SeqNum.
|
||||
*/
|
||||
void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
|
||||
synchronized (regionMaximumEditLogSeqNum) {
|
||||
String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
|
||||
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
|
||||
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
|
||||
regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of currently opened writers
|
||||
*/
|
||||
int getNumOpenWriters() {
|
||||
return this.writers.size();
|
||||
}
|
||||
|
||||
long getSkippedEdits() {
|
||||
return this.skippedEdits.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for writer threads to dump all info to the sink
|
||||
* @return true when there is no error
|
||||
*/
|
||||
protected boolean finishWriting(boolean interrupt) throws IOException {
|
||||
LOG.debug("Waiting for split writer threads to finish");
|
||||
boolean progress_failed = false;
|
||||
for (WriterThread t : writerThreads) {
|
||||
t.finish();
|
||||
}
|
||||
if (interrupt) {
|
||||
for (WriterThread t : writerThreads) {
|
||||
t.interrupt(); // interrupt the writer threads. We are stopping now.
|
||||
}
|
||||
}
|
||||
|
||||
for (WriterThread t : writerThreads) {
|
||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||
progress_failed = true;
|
||||
}
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException ie) {
|
||||
IOException iie = new InterruptedIOException();
|
||||
iie.initCause(ie);
|
||||
throw iie;
|
||||
}
|
||||
}
|
||||
controller.checkForErrors();
|
||||
LOG.info("{} split writers finished; closing.", this.writerThreads.size());
|
||||
return (!progress_failed);
|
||||
}
|
||||
|
||||
public abstract List<Path> finishWritingAndClose() throws IOException;
|
||||
|
||||
/**
|
||||
* @return a map from encoded region ID to the number of edits written out for that region.
|
||||
*/
|
||||
public abstract Map<byte[], Long> getOutputCounts();
|
||||
|
||||
/**
|
||||
* @return number of regions we've recovered
|
||||
*/
|
||||
public abstract int getNumberOfRecoveredRegions();
|
||||
|
||||
/**
|
||||
* @param buffer A WAL Edit Entry
|
||||
*/
|
||||
public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException;
|
||||
|
||||
/**
|
||||
* WriterThread call this function to help flush internal remaining edits in buffer before close
|
||||
* @return true when underlying sink has something to flush
|
||||
*/
|
||||
public boolean flush() throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
|
||||
* want to get all of those edits.
|
||||
* @return Return true if this sink wants to accept this region-level WALEdit.
|
||||
*/
|
||||
public abstract boolean keepRegionEvent(WAL.Entry entry);
|
||||
|
||||
public static class WriterThread extends Thread {
|
||||
private volatile boolean shouldStop = false;
|
||||
private WALSplitter.PipelineController controller;
|
||||
private EntryBuffers entryBuffers;
|
||||
private OutputSink outputSink = null;
|
||||
|
||||
WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
|
||||
OutputSink sink, int i) {
|
||||
super(Thread.currentThread().getName() + "-Writer-" + i);
|
||||
this.controller = controller;
|
||||
this.entryBuffers = entryBuffers;
|
||||
outputSink = sink;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
doRun();
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Exiting thread", t);
|
||||
controller.writerThreadError(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void doRun() throws IOException {
|
||||
LOG.trace("Writer thread starting");
|
||||
while (true) {
|
||||
WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
|
||||
if (buffer == null) {
|
||||
// No data currently available, wait on some more to show up
|
||||
synchronized (controller.dataAvailable) {
|
||||
if (shouldStop && !this.outputSink.flush()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
controller.dataAvailable.wait(500);
|
||||
} catch (InterruptedException ie) {
|
||||
if (!shouldStop) {
|
||||
throw new RuntimeException(ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
assert buffer != null;
|
||||
try {
|
||||
writeBuffer(buffer);
|
||||
} finally {
|
||||
entryBuffers.doneWriting(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
||||
outputSink.append(buffer);
|
||||
}
|
||||
|
||||
void setShouldStop(boolean shouldStop) {
|
||||
this.shouldStop = shouldStop;
|
||||
}
|
||||
|
||||
void finish() {
|
||||
synchronized (controller.dataAvailable) {
|
||||
shouldStop = true;
|
||||
controller.dataAvailable.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,523 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
/**
|
||||
* This class provides static methods to support WAL splitting related works
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class WALSplitUtil {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class);
|
||||
|
||||
private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
|
||||
private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
|
||||
private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
|
||||
private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
|
||||
private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
|
||||
|
||||
private WALSplitUtil() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Completes the work done by splitLogFile by archiving logs
|
||||
* <p>
|
||||
* It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed
|
||||
* the splitLogFile() part. If the master crashes then this function might get called multiple
|
||||
* times.
|
||||
* <p>
|
||||
* @param logfile
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
|
||||
Path walDir = FSUtils.getWALRootDir(conf);
|
||||
Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
Path walPath;
|
||||
if (FSUtils.isStartingWithPath(walDir, logfile)) {
|
||||
walPath = new Path(logfile);
|
||||
} else {
|
||||
walPath = new Path(walDir, logfile);
|
||||
}
|
||||
finishSplitLogFile(walDir, oldLogDir, walPath, conf);
|
||||
}
|
||||
|
||||
static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
|
||||
Configuration conf) throws IOException {
|
||||
List<Path> processedLogs = new ArrayList<>();
|
||||
List<Path> corruptedLogs = new ArrayList<>();
|
||||
FileSystem walFS = walDir.getFileSystem(conf);
|
||||
if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
|
||||
corruptedLogs.add(walPath);
|
||||
} else {
|
||||
processedLogs.add(walPath);
|
||||
}
|
||||
archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
|
||||
Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
|
||||
walFS.delete(stagingDir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
|
||||
* that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
|
||||
*/
|
||||
private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
|
||||
final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
|
||||
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
||||
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
|
||||
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
|
||||
corruptDir);
|
||||
}
|
||||
if (!walFS.mkdirs(corruptDir)) {
|
||||
LOG.info("Unable to mkdir {}", corruptDir);
|
||||
}
|
||||
walFS.mkdirs(oldWALDir);
|
||||
|
||||
// this method can get restarted or called multiple times for archiving
|
||||
// the same log files.
|
||||
for (Path corruptedWAL : corruptedWALs) {
|
||||
Path p = new Path(corruptDir, corruptedWAL.getName());
|
||||
if (walFS.exists(corruptedWAL)) {
|
||||
if (!walFS.rename(corruptedWAL, p)) {
|
||||
LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
|
||||
} else {
|
||||
LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Path p : processedWALs) {
|
||||
Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
|
||||
if (walFS.exists(p)) {
|
||||
if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
|
||||
LOG.warn("Unable to move {} to {}", p, newPath);
|
||||
} else {
|
||||
LOG.info("Archived processed log {} to {}", p, newPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
|
||||
* named for the sequenceid in the passed <code>logEntry</code>: e.g.
|
||||
* /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
|
||||
* RECOVERED_EDITS_DIR under the region creating it if necessary.
|
||||
* @param walEntry walEntry to recover
|
||||
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
|
||||
* @param tmpDirName of the directory used to sideline old recovered edits file
|
||||
* @param conf configuration
|
||||
* @return Path to file into which to dump split log edits.
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@VisibleForTesting
|
||||
static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit,
|
||||
String tmpDirName, Configuration conf) throws IOException {
|
||||
FileSystem walFS = FSUtils.getWALFileSystem(conf);
|
||||
Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName());
|
||||
String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName());
|
||||
Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
|
||||
Path dir = getRegionDirRecoveredEditsDir(regionDir);
|
||||
|
||||
if (walFS.exists(dir) && walFS.isFile(dir)) {
|
||||
Path tmp = new Path(tmpDirName);
|
||||
if (!walFS.exists(tmp)) {
|
||||
walFS.mkdirs(tmp);
|
||||
}
|
||||
tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
|
||||
LOG.warn("Found existing old file: {}. It could be some "
|
||||
+ "leftover of an old installation. It should be a folder instead. "
|
||||
+ "So moving it to {}",
|
||||
dir, tmp);
|
||||
if (!walFS.rename(dir, tmp)) {
|
||||
LOG.warn("Failed to sideline old file {}", dir);
|
||||
}
|
||||
}
|
||||
|
||||
if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
|
||||
LOG.warn("mkdir failed on {}", dir);
|
||||
}
|
||||
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
|
||||
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
|
||||
// region's replayRecoveredEdits will not delete it
|
||||
String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId());
|
||||
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
|
||||
return new Path(dir, fileName);
|
||||
}
|
||||
|
||||
private static String getTmpRecoveredEditsFileName(String fileName) {
|
||||
return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the completed recovered edits file path, renaming it to be by last edit in the file from
|
||||
* its first edit. Then we could use the name to skip recovered edits when doing
|
||||
* {@link HRegion#replayRecoveredEditsIfAny}.
|
||||
* @return dstPath take file's last edit log seq num as the name
|
||||
*/
|
||||
static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
|
||||
String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum);
|
||||
return new Path(srcPath.getParent(), fileName);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String formatRecoveredEditsFileName(final long seqid) {
|
||||
return String.format("%019d", seqid);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param regionDir This regions directory in the filesystem.
|
||||
* @return The directory that holds recovered edits files for the region <code>regionDir</code>
|
||||
*/
|
||||
public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
|
||||
return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether there is recovered.edits in the region dir
|
||||
* @param conf conf
|
||||
* @param regionInfo the region to check
|
||||
* @throws IOException IOException
|
||||
* @return true if recovered.edits exist in the region dir
|
||||
*/
|
||||
public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo)
|
||||
throws IOException {
|
||||
// No recovered.edits for non default replica regions
|
||||
if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
|
||||
return false;
|
||||
}
|
||||
// Only default replica region can reach here, so we can use regioninfo
|
||||
// directly without converting it to default replica's regioninfo.
|
||||
Path regionDir =
|
||||
FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
|
||||
NavigableSet<Path> files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir);
|
||||
return files != null && !files.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix.
|
||||
* @param walFS WAL FileSystem used to retrieving split edits files.
|
||||
* @param regionDir WAL region dir to look for recovered edits files under.
|
||||
* @return Files in passed <code>regionDir</code> as a sorted set.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
|
||||
final Path regionDir) throws IOException {
|
||||
NavigableSet<Path> filesSorted = new TreeSet<>();
|
||||
Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
|
||||
if (!walFS.exists(editsdir)) {
|
||||
return filesSorted;
|
||||
}
|
||||
FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
boolean result = false;
|
||||
try {
|
||||
// Return files and only files that match the editfile names pattern.
|
||||
// There can be other files in this directory other than edit files.
|
||||
// In particular, on error, we'll move aside the bad edit file giving
|
||||
// it a timestamp suffix. See moveAsideBadEditsFile.
|
||||
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
|
||||
result = walFS.isFile(p) && m.matches();
|
||||
// Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
|
||||
// because it means splitwal thread is writting this file.
|
||||
if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
|
||||
result = false;
|
||||
}
|
||||
// Skip SeqId Files
|
||||
if (isSequenceIdFile(p)) {
|
||||
result = false;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed isFile check on {}", p, e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
});
|
||||
if (ArrayUtils.isNotEmpty(files)) {
|
||||
Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
|
||||
}
|
||||
return filesSorted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Move aside a bad edits file.
|
||||
* @param walFS WAL FileSystem used to rename bad edits file.
|
||||
* @param edits Edits file to move aside.
|
||||
* @return The name of the moved aside file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
|
||||
throws IOException {
|
||||
Path moveAsideName =
|
||||
new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
|
||||
if (!walFS.rename(edits, moveAsideName)) {
|
||||
LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
|
||||
}
|
||||
return moveAsideName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the given file a region open sequence id file.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static boolean isSequenceIdFile(final Path file) {
|
||||
return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
|
||||
|| file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
|
||||
}
|
||||
|
||||
private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
|
||||
throws IOException {
|
||||
// TODO: Why are we using a method in here as part of our normal region open where
|
||||
// there is no splitting involved? Fix. St.Ack 01/20/2017.
|
||||
Path editsDir = getRegionDirRecoveredEditsDir(regionDir);
|
||||
try {
|
||||
FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile);
|
||||
return files != null ? files : new FileStatus[0];
|
||||
} catch (FileNotFoundException e) {
|
||||
return new FileStatus[0];
|
||||
}
|
||||
}
|
||||
|
||||
private static long getMaxSequenceId(FileStatus[] files) {
|
||||
long maxSeqId = -1L;
|
||||
for (FileStatus file : files) {
|
||||
String fileName = file.getPath().getName();
|
||||
try {
|
||||
maxSeqId = Math.max(maxSeqId, Long
|
||||
.parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
|
||||
} catch (NumberFormatException ex) {
|
||||
LOG.warn("Invalid SeqId File Name={}", fileName);
|
||||
}
|
||||
}
|
||||
return maxSeqId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the max sequence id which is stored in the region directory. -1 if none.
|
||||
*/
|
||||
public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
|
||||
return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file with name as region's max sequence id
|
||||
*/
|
||||
public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
|
||||
throws IOException {
|
||||
FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
|
||||
long maxSeqId = getMaxSequenceId(files);
|
||||
if (maxSeqId > newMaxSeqId) {
|
||||
throw new IOException("The new max sequence id " + newMaxSeqId
|
||||
+ " is less than the old max sequence id " + maxSeqId);
|
||||
}
|
||||
// write a new seqId file
|
||||
Path newSeqIdFile =
|
||||
new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
|
||||
if (newMaxSeqId != maxSeqId) {
|
||||
try {
|
||||
if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
|
||||
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
|
||||
}
|
||||
LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
|
||||
maxSeqId);
|
||||
} catch (FileAlreadyExistsException ignored) {
|
||||
// latest hdfs throws this exception. it's all right if newSeqIdFile already exists
|
||||
}
|
||||
}
|
||||
// remove old ones
|
||||
for (FileStatus status : files) {
|
||||
if (!newSeqIdFile.equals(status.getPath())) {
|
||||
walFS.delete(status.getPath(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** A struct used by getMutationsFromWALEntry */
|
||||
public static class MutationReplay implements Comparable<MutationReplay> {
|
||||
public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
|
||||
long nonceGroup, long nonce) {
|
||||
this.type = type;
|
||||
this.mutation = mutation;
|
||||
if (this.mutation.getDurability() != Durability.SKIP_WAL) {
|
||||
// using ASYNC_WAL for relay
|
||||
this.mutation.setDurability(Durability.ASYNC_WAL);
|
||||
}
|
||||
this.nonceGroup = nonceGroup;
|
||||
this.nonce = nonce;
|
||||
}
|
||||
|
||||
private final ClientProtos.MutationProto.MutationType type;
|
||||
public final Mutation mutation;
|
||||
public final long nonceGroup;
|
||||
public final long nonce;
|
||||
|
||||
@Override
|
||||
public int compareTo(final MutationReplay d) {
|
||||
return this.mutation.compareTo(d.mutation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof MutationReplay)) {
|
||||
return false;
|
||||
} else {
|
||||
return this.compareTo((MutationReplay) obj) == 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return this.mutation.hashCode();
|
||||
}
|
||||
|
||||
public ClientProtos.MutationProto.MutationType getType() {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
|
||||
* WALEdit from the passed in WALEntry
|
||||
* @param entry
|
||||
* @param cells
|
||||
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
|
||||
* extracted from the passed in WALEntry.
|
||||
* @return list of Pair<MutationType, Mutation> to be replayed
|
||||
* @throws IOException
|
||||
*/
|
||||
public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
|
||||
CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
|
||||
if (entry == null) {
|
||||
// return an empty array
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
long replaySeqId =
|
||||
(entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber()
|
||||
: entry.getKey().getLogSequenceNumber();
|
||||
int count = entry.getAssociatedCellCount();
|
||||
List<MutationReplay> mutations = new ArrayList<>();
|
||||
Cell previousCell = null;
|
||||
Mutation m = null;
|
||||
WALKeyImpl key = null;
|
||||
WALEdit val = null;
|
||||
if (logEntry != null) {
|
||||
val = new WALEdit();
|
||||
}
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
// Throw index out of bounds if our cell count is off
|
||||
if (!cells.advance()) {
|
||||
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
|
||||
}
|
||||
Cell cell = cells.current();
|
||||
if (val != null) val.add(cell);
|
||||
|
||||
boolean isNewRowOrType =
|
||||
previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
|
||||
|| !CellUtil.matchingRows(previousCell, cell);
|
||||
if (isNewRowOrType) {
|
||||
// Create new mutation
|
||||
if (CellUtil.isDelete(cell)) {
|
||||
m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
// Deletes don't have nonces.
|
||||
mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m,
|
||||
HConstants.NO_NONCE, HConstants.NO_NONCE));
|
||||
} else {
|
||||
m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
// Puts might come from increment or append, thus we need nonces.
|
||||
long nonceGroup =
|
||||
entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
|
||||
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
|
||||
mutations.add(
|
||||
new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce));
|
||||
}
|
||||
}
|
||||
if (CellUtil.isDelete(cell)) {
|
||||
((Delete) m).add(cell);
|
||||
} else {
|
||||
((Put) m).add(cell);
|
||||
}
|
||||
m.setDurability(durability);
|
||||
previousCell = cell;
|
||||
}
|
||||
|
||||
// reconstruct WALKey
|
||||
if (logEntry != null) {
|
||||
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
|
||||
entry.getKey();
|
||||
List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
|
||||
for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
|
||||
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
|
||||
}
|
||||
key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(),
|
||||
TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId,
|
||||
walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(),
|
||||
walKeyProto.getNonce(), null);
|
||||
logEntry.setFirst(key);
|
||||
logEntry.setSecond(val);
|
||||
}
|
||||
|
||||
return mutations;
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -81,7 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
|||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.junit.After;
|
||||
|
@ -226,14 +226,14 @@ public abstract class AbstractTestDLS {
|
|||
for (RegionInfo hri : regions) {
|
||||
Path tdir = FSUtils.getWALTableDir(conf, table);
|
||||
@SuppressWarnings("deprecation")
|
||||
Path editsdir = WALSplitter
|
||||
Path editsdir = WALSplitUtil
|
||||
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
|
||||
tableName, hri.getEncodedName()));
|
||||
LOG.debug("checking edits dir " + editsdir);
|
||||
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
if (WALSplitter.isSequenceIdFile(p)) {
|
||||
if (WALSplitUtil.isSequenceIdFile(p)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -163,7 +163,7 @@ public class TestDeleteColumnFamilyProcedureFromClient {
|
|||
FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
if (WALSplitter.isSequenceIdFile(p)) {
|
||||
if (WALSplitUtil.isSequenceIdFile(p)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -244,7 +244,7 @@ public class TestDeleteColumnFamilyProcedureFromClient {
|
|||
FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
if (WALSplitter.isSequenceIdFile(p)) {
|
||||
if (WALSplitUtil.isSequenceIdFile(p)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -160,7 +160,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
|
|||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -685,7 +685,7 @@ public class TestHRegion {
|
|||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
|
||||
|
||||
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
|
||||
long maxSeqId = 1050;
|
||||
long minSeqId = 1000;
|
||||
|
@ -736,7 +736,7 @@ public class TestHRegion {
|
|||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
|
||||
|
||||
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
|
||||
long maxSeqId = 1050;
|
||||
long minSeqId = 1000;
|
||||
|
@ -789,7 +789,7 @@ public class TestHRegion {
|
|||
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
|
||||
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
for (int i = 1000; i < 1050; i += 10) {
|
||||
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
|
||||
FSDataOutputStream dos = fs.create(recoveredEdits);
|
||||
|
@ -822,7 +822,7 @@ public class TestHRegion {
|
|||
|
||||
assertEquals(0, region.getStoreFileList(columns).size());
|
||||
|
||||
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
|
||||
long maxSeqId = 1050;
|
||||
long minSeqId = 1000;
|
||||
|
@ -938,7 +938,7 @@ public class TestHRegion {
|
|||
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
|
||||
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
|
||||
|
||||
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
|
||||
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
|
||||
fs.create(recoveredEdits);
|
||||
|
@ -1063,7 +1063,7 @@ public class TestHRegion {
|
|||
|
||||
// now write those markers to the recovered edits again.
|
||||
|
||||
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
|
||||
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
|
||||
fs.create(recoveredEdits);
|
||||
|
|
|
@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
|||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
|
|
@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
|||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
|
@ -149,7 +149,7 @@ public class TestRecoveredEdits {
|
|||
assertTrue(storeFiles.isEmpty());
|
||||
region.close();
|
||||
Path regionDir = region.getRegionDir(hbaseRootDir, hri);
|
||||
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
|
||||
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir);
|
||||
// This is a little fragile getting this path to a file of 10M of edits.
|
||||
Path recoveredEditsFile = new Path(
|
||||
System.getProperty("test.build.classes", "target/test-classes"),
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
|||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -146,7 +146,7 @@ public class TestRecoveredEditsReplayAndAbort {
|
|||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
|
||||
|
||||
Path recoveredEditsDir = WALSplitter
|
||||
Path recoveredEditsDir = WALSplitUtil
|
||||
.getRegionDirRecoveredEditsDir(regiondir);
|
||||
long maxSeqId = 1200;
|
||||
long minSeqId = 1000;
|
||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
|||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||
import org.junit.After;
|
||||
|
@ -902,15 +903,12 @@ public abstract class AbstractTestWALReplay {
|
|||
assertTrue(listStatus.length > 0);
|
||||
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
|
||||
this.fs, this.conf, null, null, null, wals);
|
||||
FileStatus[] listStatus1 = this.fs.listStatus(
|
||||
new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(),
|
||||
"recovered.edits")), new PathFilter() {
|
||||
FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName),
|
||||
new Path(hri.getEncodedName(), "recovered.edits")),
|
||||
new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
if (WALSplitter.isSequenceIdFile(p)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return !WALSplitUtil.isSequenceIdFile(p);
|
||||
}
|
||||
});
|
||||
int editCount = 0;
|
||||
|
@ -956,7 +954,7 @@ public abstract class AbstractTestWALReplay {
|
|||
runWALSplit(this.conf);
|
||||
|
||||
// here we let the DFSInputStream throw an IOException just after the WALHeader.
|
||||
Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
|
||||
Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
|
||||
FSDataInputStream stream = fs.open(editFile);
|
||||
stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
|
||||
Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
|
||||
|
|
|
@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -167,13 +167,13 @@ public class TestRestoreSnapshotHelper {
|
|||
region.initialize();
|
||||
Path recoveredEdit =
|
||||
FSUtils.getWALRegionDir(conf, tableName, region.getRegionInfo().getEncodedName());
|
||||
long maxSeqId = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||
long maxSeqId = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||
|
||||
// open restored region without set restored flag
|
||||
HRegion region2 = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs,
|
||||
conf, restoredRegion, htd, null);
|
||||
region2.initialize();
|
||||
long maxSeqId2 = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||
long maxSeqId2 = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||
Assert.assertTrue(maxSeqId2 > maxSeqId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,30 +66,30 @@ public class TestReadWriteSeqIdFiles {
|
|||
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
|
||||
assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
|
||||
WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
|
||||
assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
|
||||
WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
|
||||
assertEquals(1000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR));
|
||||
WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
|
||||
assertEquals(2000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR));
|
||||
// can not write a sequence id which is smaller
|
||||
try {
|
||||
WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
|
||||
WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
LOG.info("Expected error", e);
|
||||
}
|
||||
|
||||
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR);
|
||||
Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(REGION_DIR);
|
||||
FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
return WALSplitter.isSequenceIdFile(p);
|
||||
return WALSplitUtil.isSequenceIdFile(p);
|
||||
}
|
||||
});
|
||||
// only one seqid file should exist
|
||||
assertEquals(1, files.length);
|
||||
|
||||
// verify all seqId files aren't treated as recovered.edits files
|
||||
NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR);
|
||||
NavigableSet<Path> recoveredEdits = WALSplitUtil.getSplitEditFilesSorted(walFS, REGION_DIR);
|
||||
assertEquals(0, recoveredEdits.size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
|||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -89,17 +88,17 @@ public class TestWALMethods {
|
|||
Path regiondir = util.getDataTestDir("regiondir");
|
||||
fs.delete(regiondir, true);
|
||||
fs.mkdirs(regiondir);
|
||||
Path recoverededits = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
String first = WALSplitter.formatRecoveredEditsFileName(-1);
|
||||
Path recoverededits = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
String first = WALSplitUtil.formatRecoveredEditsFileName(-1);
|
||||
createFile(fs, recoverededits, first);
|
||||
createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(0));
|
||||
createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(1));
|
||||
createFile(fs, recoverededits, WALSplitter
|
||||
createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(0));
|
||||
createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(1));
|
||||
createFile(fs, recoverededits, WALSplitUtil
|
||||
.formatRecoveredEditsFileName(11));
|
||||
createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(2));
|
||||
createFile(fs, recoverededits, WALSplitter
|
||||
createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(2));
|
||||
createFile(fs, recoverededits, WALSplitUtil
|
||||
.formatRecoveredEditsFileName(50));
|
||||
String last = WALSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE);
|
||||
String last = WALSplitUtil.formatRecoveredEditsFileName(Long.MAX_VALUE);
|
||||
createFile(fs, recoverededits, last);
|
||||
createFile(fs, recoverededits,
|
||||
Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
|
||||
|
@ -108,21 +107,21 @@ public class TestWALMethods {
|
|||
FSUtils.setRootDir(walConf, regiondir);
|
||||
(new WALFactory(walConf, "dummyLogName")).getWAL(null);
|
||||
|
||||
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
|
||||
NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
|
||||
assertEquals(7, files.size());
|
||||
assertEquals(files.pollFirst().getName(), first);
|
||||
assertEquals(files.pollLast().getName(), last);
|
||||
assertEquals(files.pollFirst().getName(),
|
||||
WALSplitter
|
||||
WALSplitUtil
|
||||
.formatRecoveredEditsFileName(0));
|
||||
assertEquals(files.pollFirst().getName(),
|
||||
WALSplitter
|
||||
WALSplitUtil
|
||||
.formatRecoveredEditsFileName(1));
|
||||
assertEquals(files.pollFirst().getName(),
|
||||
WALSplitter
|
||||
WALSplitUtil
|
||||
.formatRecoveredEditsFileName(2));
|
||||
assertEquals(files.pollFirst().getName(),
|
||||
WALSplitter
|
||||
WALSplitUtil
|
||||
.formatRecoveredEditsFileName(11));
|
||||
}
|
||||
|
||||
|
|
|
@ -399,7 +399,7 @@ public class TestWALSplit {
|
|||
Path regiondir = new Path(tdir,
|
||||
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
|
||||
fs.mkdirs(regiondir);
|
||||
Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||
assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
|
||||
fs.createNewFile(parent); // create a recovered.edits file
|
||||
String parentOfParent = p.getParent().getParent().getName();
|
||||
|
@ -414,7 +414,7 @@ public class TestWALSplit {
|
|||
new Entry(new WALKeyImpl(encoded,
|
||||
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
||||
new WALEdit());
|
||||
Path p = WALSplitter.getRegionSplitEditsPath(entry,
|
||||
Path p = WALSplitUtil.getRegionSplitEditsPath(entry,
|
||||
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
|
||||
return p;
|
||||
}
|
||||
|
@ -422,10 +422,10 @@ public class TestWALSplit {
|
|||
@Test
|
||||
public void testHasRecoveredEdits() throws IOException {
|
||||
Path p = createRecoveredEditsPathForRegion();
|
||||
assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
|
||||
assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
|
||||
String renamedEdit = p.getName().split("-")[0];
|
||||
fs.createNewFile(new Path(p.getParent(), renamedEdit));
|
||||
assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
|
||||
assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
|
||||
}
|
||||
|
||||
private void useDifferentDFSClient() throws IOException {
|
||||
|
@ -1157,7 +1157,7 @@ public class TestWALSplit {
|
|||
// After creating writer, simulate region's
|
||||
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
|
||||
// region and delete them, excluding files with '.temp' suffix.
|
||||
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
|
||||
NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
|
||||
if (files != null && !files.isEmpty()) {
|
||||
for (Path file : files) {
|
||||
if (!this.walFS.delete(file, false)) {
|
||||
|
@ -1243,12 +1243,12 @@ public class TestWALSplit {
|
|||
throws IOException {
|
||||
Path tdir = FSUtils.getWALTableDir(conf, table);
|
||||
@SuppressWarnings("deprecation")
|
||||
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
|
||||
Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
|
||||
Bytes.toString(Bytes.toBytes(region))));
|
||||
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
if (WALSplitter.isSequenceIdFile(p)) {
|
||||
if (WALSplitUtil.isSequenceIdFile(p)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue