HBASE-20037 Race when calling SequenceIdAccounting.resetHighest

This commit is contained in:
zhangduo 2018-02-21 18:19:01 +08:00 committed by Michael Stack
parent 2b4df5e36e
commit 30c2dcd883
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
6 changed files with 75 additions and 67 deletions

View File

@ -661,9 +661,26 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
int oldNumEntries = this.numEntries.getAndSet(0);
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
newPathString);
} else {
LOG.info("New WAL {}", newPathString);
}
}
/** /**
* <p>
* Cleans up current writer closing it and then puts in place the passed in * Cleans up current writer closing it and then puts in place the passed in
* <code>nextWriter</code>. * <code>nextWriter</code>.
* </p>
* <p> * <p>
* <ul> * <ul>
* <li>In the case of creating a new WAL, oldPath will be null.</li> * <li>In the case of creating a new WAL, oldPath will be null.</li>
@ -672,26 +689,17 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
* null.</li> * null.</li>
* </ul> * </ul>
* </p>
* @param oldPath may be null * @param oldPath may be null
* @param newPath may be null * @param newPath may be null
* @param nextWriter may be null * @param nextWriter may be null
* @return the passed in <code>newPath</code> * @return the passed in <code>newPath</code>
* @throws IOException if there is a problem flushing or closing the underlying FS * @throws IOException if there is a problem flushing or closing the underlying FS
*/ */
@VisibleForTesting
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter); doReplaceWriter(oldPath, newPath, nextWriter);
int oldNumEntries = this.numEntries.getAndSet(0);
final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
} else {
LOG.info("New WAL " + newPathString);
}
return newPath; return newPath;
} }
} }
@ -1021,10 +1029,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract W createWriterInstance(Path path) protected abstract W createWriterInstance(Path path)
throws IOException, CommonFSUtils.StreamLacksCapabilityException; throws IOException, CommonFSUtils.StreamLacksCapabilityException;
/** protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
* @return old wal file size
*/
protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
throws IOException; throws IOException;
protected abstract void doShutdown() throws IOException; protected abstract void doShutdown() throws IOException;

View File

@ -631,11 +631,29 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
} }
private long closeWriter() {
AsyncWriter oldWriter = this.writer;
if (oldWriter != null) {
long fileLength = oldWriter.getLength();
closeExecutor.execute(() -> {
try {
oldWriter.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
});
return fileLength;
} else {
return 0L;
}
}
@Override @Override
protected long doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
throws IOException { throws IOException {
waitForSafePoint(); waitForSafePoint();
final AsyncWriter oldWriter = this.writer; long oldFileLen = closeWriter();
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter; this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) { if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
@ -654,13 +672,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} finally { } finally {
consumeLock.unlock(); consumeLock.unlock();
} }
return executeClose(closeExecutor, oldWriter);
} }
@Override @Override
protected void doShutdown() throws IOException { protected void doShutdown() throws IOException {
waitForSafePoint(); waitForSafePoint();
executeClose(closeExecutor, writer); closeWriter();
closeExecutor.shutdown(); closeExecutor.shutdown();
try { try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
@ -698,23 +715,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} }
} }
private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) {
long fileLength;
if (writer != null) {
fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
});
} else {
fileLength = 0L;
}
return fileLength;
}
@Override @Override
protected void doAppend(AsyncWriter writer, FSWALEntry entry) { protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
writer.append(entry); writer.append(entry);

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -34,13 +33,14 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;

View File

@ -298,7 +298,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
@Override @Override
protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException { protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
// Ask the ring buffer writer to pause at a safe point. Once we do this, the writer // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
// thread will eventually pause. An error hereafter needs to release the writer thread // thread will eventually pause. An error hereafter needs to release the writer thread
// regardless -- hence the finally block below. Note, this method is called from the FSHLog // regardless -- hence the finally block below. Note, this method is called from the FSHLog
@ -320,7 +320,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
zigzagLatch = this.ringBufferEventHandler.attainSafePoint(); zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
} }
afterCreatingZigZagLatch(); afterCreatingZigZagLatch();
long oldFileLen = 0L;
try { try {
// Wait on the safe point to be achieved. Send in a sync in case nothing has hit the // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
// ring buffer between the above notification of writer that we want it to go to // ring buffer between the above notification of writer that we want it to go to
@ -343,6 +342,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
LOG.warn( LOG.warn(
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage()); "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
} }
long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread. // It is at the safe point. Swap out writer from under the blocked writer thread.
// TODO: This is close is inline with critical section. Should happen in background? // TODO: This is close is inline with critical section. Should happen in background?
if (this.writer != null) { if (this.writer != null) {
@ -363,6 +363,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
} }
} }
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter; this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@ -397,7 +398,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
} }
} }
} }
return oldFileLen;
} }
@Override @Override

View File

@ -1,5 +1,4 @@
/** /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException; import java.io.IOException;
@ -25,15 +23,16 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
/** /**
* Writer for protobuf-based WAL. * Writer for protobuf-based WAL.

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -29,25 +27,28 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ImmutableByteArray;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ImmutableByteArray; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
* <p>
* Accounting of sequence ids per region and then by column family. So we can our accounting * Accounting of sequence ids per region and then by column family. So we can our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
* keep abreast of the state of sequence id persistence. Also call update per append. * keep abreast of the state of sequence id persistence. Also call update per append.
* </p>
* <p> * <p>
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by * For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
* {@link HRegionInfo#getEncodedNameAsBytes()}. So it is safe to use it as a hash key. And for * {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
* family name, we use {@link ImmutableByteArray} as key. This is because hash based map is much * it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
* faster than RBTree or CSLM and here we are on the critical write path. See HBASE-16278 for more * hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
* details. * HBASE-16278 for more details.
* </p>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class SequenceIdAccounting { class SequenceIdAccounting {
@ -93,14 +94,17 @@ class SequenceIdAccounting {
*/ */
private final Map<byte[], Map<ImmutableByteArray, Long>> flushingSequenceIds = new HashMap<>(); private final Map<byte[], Map<ImmutableByteArray, Long>> flushingSequenceIds = new HashMap<>();
/** /**
* Map of region encoded names to the latest/highest region sequence id. Updated on each * <p>
* call to append. * Map of region encoded names to the latest/highest region sequence id. Updated on each call to
* <p> * append.
* This map uses byte[] as the key, and uses reference equality. It works in our use case as we * </p>
* use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns * <p>
* the same array. * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
*/ * use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a
* given region, it always returns the same array.
* </p>
*/
private Map<byte[], Long> highestSequenceIds = new HashMap<>(); private Map<byte[], Long> highestSequenceIds = new HashMap<>();
/** /**