HBASE-19372 Remove the Span object in SyncFuture as it is useless now

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
zhangduo 2017-11-29 21:07:02 +08:00 committed by Michael Stack
parent 6f7d2afcdd
commit 22b90c4a64
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
5 changed files with 47 additions and 93 deletions

View File

@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.shaded.com.google.common.base.Precondition
import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import com.lmax.disruptor.RingBuffer;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.management.MemoryType; import java.lang.management.MemoryType;
@ -59,7 +61,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.CollectionUtils;
@ -74,11 +75,10 @@ import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import com.lmax.disruptor.RingBuffer; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/** /**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
@ -696,13 +696,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
protected Span blockOnSync(final SyncFuture syncFuture) throws IOException { protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we get an answer back. // Now we have published the ringbuffer, halt the current thread until we get an answer back.
try { try {
if (syncFuture != null) { if (syncFuture != null) {
syncFuture.get(walSyncTimeoutNs); syncFuture.get(walSyncTimeoutNs);
} }
return (syncFuture == null) ? null : syncFuture.getSpan();
} catch (TimeoutIOException tioe) { } catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
// still refer to it, so if this thread use it next time may get a wrong // still refer to it, so if this thread use it next time may get a wrong
@ -792,7 +791,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* Get the backing files associated with this WAL. * Get the backing files associated with this WAL.
* @return may be null if there are no files. * @return may be null if there are no files.
*/ */
protected FileStatus[] getFiles() throws IOException { @VisibleForTesting
FileStatus[] getFiles() throws IOException {
return CommonFSUtils.listStatus(fs, walDir, ourFiles); return CommonFSUtils.listStatus(fs, walDir, ourFiles);
} }
@ -862,13 +862,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
} }
protected SyncFuture getSyncFuture(long sequence, Span span) { protected final SyncFuture getSyncFuture(long sequence) {
return CollectionUtils return CollectionUtils
.computeIfAbsent(syncFuturesByHandler, Thread.currentThread(), SyncFuture::new) .computeIfAbsent(syncFuturesByHandler, Thread.currentThread(), SyncFuture::new)
.reset(sequence, span); .reset(sequence);
} }
protected void requestLogRoll(boolean tooFewReplicas) { protected final void requestLogRoll(boolean tooFewReplicas) {
if (!this.listeners.isEmpty()) { if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) { for (WALActionsListener i : this.listeners) {
i.logRollRequested(tooFewReplicas); i.logRollRequested(tooFewReplicas);
@ -894,7 +894,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Noop // Noop
} }
protected boolean append(W writer, FSWALEntry entry) throws IOException { protected final boolean append(W writer, FSWALEntry entry) throws IOException {
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
atHeadOfRingBufferEventHandlerAppend(); atHeadOfRingBufferEventHandlerAppend();
long start = EnvironmentEdgeManager.currentTime(); long start = EnvironmentEdgeManager.currentTime();
@ -940,7 +940,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
return len; return len;
} }
protected void postSync(final long timeInNanos, final int handlerSyncs) { protected final void postSync(final long timeInNanos, final int handlerSyncs) {
if (timeInNanos > this.slowSyncNs) { if (timeInNanos > this.slowSyncNs) {
String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
.append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
@ -954,11 +954,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
protected long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKey key, WALEdit edits, protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKey key,
boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException { throws IOException {
if (this.closed) { if (this.closed) {
throw new IOException("Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
} }
MutableLong txidHolder = new MutableLong(); MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
@ -968,10 +969,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
entry.stampRegionSequenceId(we); entry.stampRegionSequenceId(we);
if(scope!=null){ if (scope != null) {
ringBuffer.get(txid).load(entry, scope.getSpan()); ringBuffer.get(txid).load(entry, scope.getSpan());
} } else {
else{
ringBuffer.get(txid).load(entry, null); ringBuffer.get(txid).load(entry, null);
} }
} finally { } finally {

View File

@ -565,24 +565,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public void sync() throws IOException { public void sync() throws IOException {
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")){ try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")){
long txid = waitingConsumePayloads.next(); long txid = waitingConsumePayloads.next();
SyncFuture future = null; SyncFuture future;
try { try {
if (scope != null) { future = getSyncFuture(txid);
future = getSyncFuture(txid, scope.getSpan()); RingBufferTruck truck = waitingConsumePayloads.get(txid);
RingBufferTruck truck = waitingConsumePayloads.get(txid); truck.load(future);
truck.load(future);
}
} finally { } finally {
waitingConsumePayloads.publish(txid); waitingConsumePayloads.publish(txid);
} }
if (shouldScheduleConsumer()) { if (shouldScheduleConsumer()) {
eventLoop.execute(consumer); eventLoop.execute(consumer);
} }
//TODO handle htrace API change, see HBASE-18895 blockOnSync(future);
//scope = Trace.continueSpan(blockOnSync(future));
if (future != null) {
blockOnSync(future);
}
} }
} }
@ -594,24 +588,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
// here we do not use ring buffer sequence as txid // here we do not use ring buffer sequence as txid
long sequence = waitingConsumePayloads.next(); long sequence = waitingConsumePayloads.next();
SyncFuture future = null; SyncFuture future;
try { try {
if(scope!= null) { future = getSyncFuture(txid);
future = getSyncFuture(txid, scope.getSpan()); RingBufferTruck truck = waitingConsumePayloads.get(sequence);
RingBufferTruck truck = waitingConsumePayloads.get(sequence); truck.load(future);
truck.load(future);
}
} finally { } finally {
waitingConsumePayloads.publish(sequence); waitingConsumePayloads.publish(sequence);
} }
if (shouldScheduleConsumer()) { if (shouldScheduleConsumer()) {
eventLoop.execute(consumer); eventLoop.execute(consumer);
} }
//TODO handle htrace API change, see HBASE-18895 blockOnSync(future);
//scope = Trace.continueSpan(blockOnSync(future));
if (future != null) {
blockOnSync(future);
}
} }
} }

View File

@ -17,6 +17,14 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -55,18 +63,10 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.BlockingWaitStrategy; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
/** /**
* The default implementation of FSWAL. * The default implementation of FSWAL.
@ -702,22 +702,18 @@ public class FSHLog extends AbstractFSWAL<Writer> {
return logRollNeeded; return logRollNeeded;
} }
private SyncFuture publishSyncOnRingBuffer(long sequence) {
return publishSyncOnRingBuffer(sequence, null);
}
private long getSequenceOnRingBuffer() { private long getSequenceOnRingBuffer() {
return this.disruptor.getRingBuffer().next(); return this.disruptor.getRingBuffer().next();
} }
private SyncFuture publishSyncOnRingBuffer(Span span) { private SyncFuture publishSyncOnRingBuffer() {
long sequence = this.disruptor.getRingBuffer().next(); long sequence = getSequenceOnRingBuffer();
return publishSyncOnRingBuffer(sequence, span); return publishSyncOnRingBuffer(sequence);
} }
private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) { private SyncFuture publishSyncOnRingBuffer(long sequence) {
// here we use ring buffer sequence as transaction id // here we use ring buffer sequence as transaction id
SyncFuture syncFuture = getSyncFuture(sequence, span); SyncFuture syncFuture = getSyncFuture(sequence);
try { try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
truck.load(syncFuture); truck.load(syncFuture);
@ -729,14 +725,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Sync all known transactions // Sync all known transactions
private void publishSyncThenBlockOnCompletion(TraceScope scope) throws IOException { private void publishSyncThenBlockOnCompletion(TraceScope scope) throws IOException {
if (scope != null) { SyncFuture syncFuture = publishSyncOnRingBuffer();
SyncFuture syncFuture = publishSyncOnRingBuffer(scope.getSpan()); blockOnSync(syncFuture);
blockOnSync(syncFuture);
}
else {
SyncFuture syncFuture = publishSyncOnRingBuffer(null);
blockOnSync(syncFuture);
}
} }
/** /**

View File

@ -20,9 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.htrace.core.Span; import org.apache.yetus.audience.InterfaceAudience;
/** /**
* A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till the * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till the
@ -68,11 +67,6 @@ class SyncFuture {
private Thread t; private Thread t;
/**
* Optionally carry a disconnected scope to the SyncRunner.
*/
private Span span;
/** /**
* Call this method to clear old usage and get it ready for new deploy. * Call this method to clear old usage and get it ready for new deploy.
* @param txid the new transaction id * @param txid the new transaction id
@ -80,7 +74,7 @@ class SyncFuture {
* call to {@link #get(long)}. * call to {@link #get(long)}.
* @return this * @return this
*/ */
synchronized SyncFuture reset(final long txid, Span span) { synchronized SyncFuture reset(long txid) {
if (t != null && t != Thread.currentThread()) { if (t != null && t != Thread.currentThread()) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -90,7 +84,6 @@ class SyncFuture {
} }
this.doneTxid = NOT_DONE; this.doneTxid = NOT_DONE;
this.txid = txid; this.txid = txid;
this.span = span;
this.throwable = null; this.throwable = null;
return this; return this;
} }
@ -104,23 +97,6 @@ class SyncFuture {
return this.txid; return this.txid;
} }
/**
* Retrieve the {@code span} instance from this Future. EventHandler calls this method to continue
* the span. Thread waiting on this Future musn't call this method until AFTER calling
* {@link #get(long)} and the future has been released back to the originating thread.
*/
synchronized Span getSpan() {
return this.span;
}
/**
* Used to re-attach a {@code span} to the Future. Called by the EventHandler after a it has
* completed processing and detached the span from its scope.
*/
synchronized void setSpan(Span span) {
this.span = span;
}
/** /**
* @param txid the transaction id at which this future 'completed'. * @param txid the transaction id at which this future 'completed'.
* @param t Can be null. Set if we are 'completing' on error (and this 't' is the error). * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).

View File

@ -32,10 +32,10 @@ public class TestSyncFuture {
public void testGet() throws Exception { public void testGet() throws Exception {
long timeout = 5000; long timeout = 5000;
long txid = 100000; long txid = 100000;
SyncFuture syncFulture = new SyncFuture().reset(txid, null); SyncFuture syncFulture = new SyncFuture().reset(txid);
syncFulture.done(txid, null); syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout)); assertEquals(txid, syncFulture.get(timeout));
syncFulture.reset(txid, null).get(timeout); syncFulture.reset(txid).get(timeout);
} }
} }