HBASE-20037 Race when calling SequenceIdAccounting.resetHighest

This commit is contained in:
zhangduo 2018-02-21 18:19:01 +08:00 committed by Michael Stack
parent 92d04d5751
commit a27ef55a40
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
6 changed files with 75 additions and 67 deletions

View File

@ -661,9 +661,26 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
int oldNumEntries = this.numEntries.getAndSet(0);
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
newPathString);
} else {
LOG.info("New WAL {}", newPathString);
}
}
/**
* <p>
* Cleans up current writer closing it and then puts in place the passed in
* <code>nextWriter</code>.
* </p>
* <p>
* <ul>
* <li>In the case of creating a new WAL, oldPath will be null.</li>
@ -672,26 +689,17 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
* null.</li>
* </ul>
* </p>
* @param oldPath may be null
* @param newPath may be null
* @param nextWriter may be null
* @return the passed in <code>newPath</code>
* @throws IOException if there is a problem flushing or closing the underlying FS
*/
@VisibleForTesting
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
int oldNumEntries = this.numEntries.getAndSet(0);
final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
} else {
LOG.info("New WAL " + newPathString);
}
doReplaceWriter(oldPath, newPath, nextWriter);
return newPath;
}
}
@ -1021,10 +1029,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract W createWriterInstance(Path path)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
/**
* @return old wal file size
*/
protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
throws IOException;
protected abstract void doShutdown() throws IOException;

View File

@ -631,11 +631,29 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private long closeWriter() {
AsyncWriter oldWriter = this.writer;
if (oldWriter != null) {
long fileLength = oldWriter.getLength();
closeExecutor.execute(() -> {
try {
oldWriter.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
});
return fileLength;
} else {
return 0L;
}
}
@Override
protected long doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
throws IOException {
waitForSafePoint();
final AsyncWriter oldWriter = this.writer;
long oldFileLen = closeWriter();
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
@ -654,13 +672,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} finally {
consumeLock.unlock();
}
return executeClose(closeExecutor, oldWriter);
}
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
executeClose(closeExecutor, writer);
closeWriter();
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
@ -698,23 +715,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) {
long fileLength;
if (writer != null) {
fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
});
} else {
fileLength = 0L;
}
return fileLength;
}
@Override
protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
writer.append(entry);

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -34,13 +33,14 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;

View File

@ -298,7 +298,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
@Override
protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
// Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
// thread will eventually pause. An error hereafter needs to release the writer thread
// regardless -- hence the finally block below. Note, this method is called from the FSHLog
@ -320,7 +320,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
}
afterCreatingZigZagLatch();
long oldFileLen = 0L;
try {
// Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
// ring buffer between the above notification of writer that we want it to go to
@ -343,6 +342,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
LOG.warn(
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
}
long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
// TODO: This is close is inline with critical section. Should happen in background?
if (this.writer != null) {
@ -363,6 +363,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
}
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@ -397,7 +398,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
}
return oldFileLen;
}
@Override

View File

@ -1,5 +1,4 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
@ -25,15 +23,16 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
/**
* Writer for protobuf-based WAL.

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -29,25 +27,28 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ImmutableByteArray;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ImmutableByteArray;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* <p>
* Accounting of sequence ids per region and then by column family. So we can our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
* keep abreast of the state of sequence id persistence. Also call update per append.
* </p>
* <p>
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
* {@link HRegionInfo#getEncodedNameAsBytes()}. So it is safe to use it as a hash key. And for
* family name, we use {@link ImmutableByteArray} as key. This is because hash based map is much
* faster than RBTree or CSLM and here we are on the critical write path. See HBASE-16278 for more
* details.
* {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
* it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
* hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
* HBASE-16278 for more details.
* </p>
*/
@InterfaceAudience.Private
class SequenceIdAccounting {
@ -94,12 +95,15 @@ class SequenceIdAccounting {
private final Map<byte[], Map<ImmutableByteArray, Long>> flushingSequenceIds = new HashMap<>();
/**
* Map of region encoded names to the latest/highest region sequence id. Updated on each
* call to append.
* <p>
* Map of region encoded names to the latest/highest region sequence id. Updated on each call to
* append.
* </p>
* <p>
* This map uses byte[] as the key, and uses reference equality. It works in our use case as we
* use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
* the same array.
* use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a
* given region, it always returns the same array.
* </p>
*/
private Map<byte[], Long> highestSequenceIds = new HashMap<>();