diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 40edc05e245..fa217eec89a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -167,6 +167,9 @@ public class LogRoller extends HasThread { } } } + for (WAL wal : walNeedsRoll.keySet()) { + wal.logRollerExited(); + } LOG.info("LogRoller exiting."); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java new file mode 100644 index 00000000000..f189ff12106 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -0,0 +1,910 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DrainBarrier; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.NullScope; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +/** + * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one + * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. + * This is done internal to the implementation. + *
+ * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a + * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. + * A bunch of work in the below is done keeping account of these region sequence ids -- what is + * flushed out to hfiles, and what is yet in WAL and in memory only. + *
+ * It is only practical to delete entire files. Thus, we delete an entire on-disk file
+ * F
when all of the edits in F
have a log-sequence-id that's older
+ * (smaller) than the most-recent flush.
+ *
+ * To read an WAL, call + * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * + *
+ * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here. + *
+ * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them get
+ * them from this Map?
+ */
+ private final ConcurrentMap
+ *
+ * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. We do not use RingBuffer here
+ * because RingBuffer need an exclusive thread to consume the entries in it, and here we want to run
+ * the append and sync operation inside EventLoop. We can not use EventLoop as the RingBuffer's
+ * executor otherwise the EventLoop can not process any other events such as socket read and write.
+ *
+ * For append, we process it as follow:
+ *
+ * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
+ * FSHLog.
+ * TODO: need to unify this and {@link RingBufferTruck}. There are mostly the same thing.
+ */
+ private static final class Payload {
+
+ // a wal entry which need to be appended
+ public final FSWALEntry entry;
+
+ // indicate that we need to sync our wal writer.
+ public final SyncFuture sync;
+
+ // incidate that we want to roll the writer.
+ public final Promise As data is flushed from the MemStore to other on-disk structures (files sorted by
- * key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given
- * HRegion-sequence id. A bunch of work in the below is done keeping account of these region
- * sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only.
- *
- * It is only practical to delete entire files. Thus, we delete an entire on-disk file
- * To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
- * org.apache.hadoop.fs.Path)}.
- *
- * When the sync completes, it marks all the passed in futures done. On the other end of the
- * sync future is a blocked thread, usually a regionserver Handler. There may be more than one
- * future passed in the case where a few threads arrive at about the same time and all invoke
- * 'sync'. In this case we'll batch up the invocations and run one filesystem sync only for a
- * batch of Handler sync invocations. Do not confuse these Handler SyncFutures with the futures
- * an ExecutorService returns when you call submit. We have no use for these in this model. These
+ * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest
+ * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run
+ * multiple threads sync'ng rather than one that just syncs in series so we have better latencies;
+ * otherwise, an edit that arrived just after a sync started, might have to wait almost the length
+ * of two sync invocations before it is marked done.
+ *
+ * When the sync completes, it marks all the passed in futures done. On the other end of the sync
+ * future is a blocked thread, usually a regionserver Handler. There may be more than one future
+ * passed in the case where a few threads arrive at about the same time and all invoke 'sync'. In
+ * this case we'll batch up the invocations and run one filesystem sync only for a batch of
+ * Handler sync invocations. Do not confuse these Handler SyncFutures with the futures an
+ * ExecutorService returns when you call submit. We have no use for these in this model. These
* SyncFutures are 'artificial', something to hold the Handler until the filesystem sync
* completes.
*/
@@ -1130,12 +492,13 @@ public class FSHLog implements WAL {
/**
* UPDATE!
* @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
- * we will put the result of the actual hdfs sync call as the result.
- * @param sequence The sequence number on the ring buffer when this thread was set running.
- * If this actual writer sync completes then all appends up this point have been
- * flushed/synced/pushed to datanodes. If we fail, then the passed in
+ * If the pipeline isn't started yet or is empty, you will get the default replication factor.
+ * Therefore, if this function returns 0, it means you are not properly running with the HDFS-826
+ * patch.
*/
@VisibleForTesting
int getLogReplication() {
try {
- //in standalone mode, it will return 0
+ // in standalone mode, it will return 0
if (this.hdfs_out instanceof HdfsDataOutputStream) {
return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication();
}
@@ -1465,7 +768,7 @@ public class FSHLog implements WAL {
@Override
public void sync(long txid) throws IOException {
- if (this.highestSyncedSequence.get() >= txid){
+ if (this.highestSyncedTxid.get() >= txid) {
// Already sync'd.
return;
}
@@ -1478,70 +781,20 @@ public class FSHLog implements WAL {
}
}
- // public only until class moves to o.a.h.h.wal
- public void requestLogRoll() {
- requestLogRoll(false);
- }
-
- private void requestLogRoll(boolean tooFewReplicas) {
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i: this.listeners) {
- i.logRollRequested(tooFewReplicas);
- }
- }
- }
-
- // public only until class moves to o.a.h.h.wal
- /** @return the number of rolled log files */
- public int getNumRolledLogFiles() {
- return byWalRegionSequenceIds.size();
- }
-
- // public only until class moves to o.a.h.h.wal
- /** @return the number of log files in use */
- public int getNumLogFiles() {
- // +1 for current use log
- return getNumRolledLogFiles() + 1;
- }
-
- // public only until class moves to o.a.h.h.wal
- /** @return the size of log files in use */
- public long getLogFileSize() {
- return this.totalLogSize.get();
- }
-
@Override
- public Long startCacheFlush(final byte[] encodedRegionName, Set Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until
- * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
- * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused,
- * goes ahead and does the work it needs to do while Thread B is holding. When Thread A is done,
- * it flags B and then Thread A and Thread B continue along on their merry way. Pause and
- * signalling 'zigzags' between the two participating threads. We use two latches -- one the
- * inverse of the other -- pausing and signaling when states are achieved.
- *
- * To start up the drama, Thread A creates an instance of this class each time it would do
- * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
- * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
- * starts to work toward the 'safe point'. Thread A calls {@link #waitSafePoint()} when it
- * cannot proceed until the Thread B 'safe point' is attained. Thread A will be held inside in
- * {@link #waitSafePoint()} until Thread B reaches the 'safe point'. Once there, Thread B
- * frees Thread A by calling {@link #safePointAttained()}. Thread A now knows Thread B
- * is at the 'safe point' and that it is holding there (When Thread B calls
- * {@link #safePointAttained()} it blocks here until Thread A calls {@link #releaseSafePoint()}).
- * Thread A proceeds to do what it needs to do while Thread B is paused. When finished,
- * it lets Thread B lose by calling {@link #releaseSafePoint()} and away go both Threads again.
+ * This class is used coordinating two threads holding one thread at a 'safe point' while the
+ * orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL
+ * writer while its WAL is swapped out from under it by another thread.
+ *
+ * Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until Thread B
+ * gets there. When the 'safe point' has been attained, Thread B signals Thread A. Thread B then
+ * holds at the 'safe point'. Thread A on notification that Thread B is paused, goes ahead and
+ * does the work it needs to do while Thread B is holding. When Thread A is done, it flags B and
+ * then Thread A and Thread B continue along on their merry way. Pause and signalling 'zigzags'
+ * between the two participating threads. We use two latches -- one the inverse of the other --
+ * pausing and signaling when states are achieved.
+ *
+ * To start up the drama, Thread A creates an instance of this class each time it would do this
+ * zigzag dance and passes it to Thread B (these classes use Latches so it is one shot only).
+ * Thread B notices the new instance (via reading a volatile reference or how ever) and it starts
+ * to work toward the 'safe point'. Thread A calls {@link #waitSafePoint()} when it cannot proceed
+ * until the Thread B 'safe point' is attained. Thread A will be held inside in
+ * {@link #waitSafePoint()} until Thread B reaches the 'safe point'. Once there, Thread B frees
+ * Thread A by calling {@link #safePointAttained()}. Thread A now knows Thread B is at the 'safe
+ * point' and that it is holding there (When Thread B calls {@link #safePointAttained()} it blocks
+ * here until Thread A calls {@link #releaseSafePoint()}). Thread A proceeds to do what it needs
+ * to do while Thread B is paused. When finished, it lets Thread B lose by calling
+ * {@link #releaseSafePoint()} and away go both Threads again.
*/
static class SafePointZigZagLatch {
/**
@@ -1607,24 +839,23 @@ public class FSHLog implements WAL {
*/
private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
/**
- * Latch to wait on. Will be released when we can proceed.
+ * Latch to wait on. Will be released when we can proceed.
*/
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
/**
- * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
- * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
- * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with
- * an exception, then something is up w/ our syncing.
- * @throws InterruptedException
- * @throws ExecutionException
+ * For Thread A to call when it is ready to wait on the 'safe point' to be attained. Thread A
+ * will be held in here until Thread B calls {@link #safePointAttained()}
+ * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with an
+ * exception, then something is up w/ our syncing.
* @return The passed Herein, we have an array into which we store the sync futures as they come in. When we
- * have a 'batch', we'll then pass what we have collected to a SyncRunner thread to do the
- * filesystem sync. When it completes, it will then call
- * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the batch to release
- * blocked Handler threads.
- * I've tried various effects to try and make latencies low while keeping throughput high.
- * I've tried keeping a single Queue of SyncFutures in this class appending to its tail as the
- * syncs coming and having sync runner threads poll off the head to 'finish' completed
- * SyncFutures. I've tried linkedlist, and various from concurrent utils whether
- * LinkedBlockingQueue or ArrayBlockingQueue, etc. The more points of synchronization, the
- * more 'work' (according to 'perf stats') that has to be done; small increases in stall
- * percentages seem to have a big impact on throughput/latencies. The below model where we have
- * an array into which we stash the syncs and then hand them off to the sync thread seemed like
- * a decent compromise. See HBASE-8755 for more detail.
+ * 'writer/appender' thread. Appends edits and starts up sync runs. Tries its best to batch up
+ * syncs. There is no discernible benefit batching appends so we just append as they come in
+ * because it simplifies the below implementation. See metrics for batching effectiveness (In
+ * measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10 handler
+ * sync invocations for every actual dfsclient sync call; at 10 concurrent handlers, YMMV).
+ *
+ * Herein, we have an array into which we store the sync futures as they come in. When we have a
+ * 'batch', we'll then pass what we have collected to a SyncRunner thread to do the filesystem
+ * sync. When it completes, it will then call {@link SyncFuture#done(long, Throwable)} on each of
+ * SyncFutures in the batch to release blocked Handler threads.
+ *
+ * I've tried various effects to try and make latencies low while keeping throughput high. I've
+ * tried keeping a single Queue of SyncFutures in this class appending to its tail as the syncs
+ * coming and having sync runner threads poll off the head to 'finish' completed SyncFutures. I've
+ * tried linkedlist, and various from concurrent utils whether LinkedBlockingQueue or
+ * ArrayBlockingQueue, etc. The more points of synchronization, the more 'work' (according to
+ * 'perf stats') that has to be done; small increases in stall percentages seem to have a big
+ * impact on throughput/latencies. The below model where we have an array into which we stash the
+ * syncs and then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755
+ * for more detail.
*/
class RingBufferEventHandler implements EventHandler Handlers coming in call append, append, append, and then do a flush/sync of
- * the edits they have appended the WAL before returning. Since sync takes a while to
- * complete, we give the Handlers back this sync future to wait on until the
- * actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a
- * sync runner thread; when it completes, it finishes up the future, the handler get or failed
- * check completes and the Handler can then progress.
+ * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till the
+ * sync completes.
*
- * This is just a partial implementation of Future; we just implement get and
- * failure. Unimplemented methods throw {@link UnsupportedOperationException}.
+ * Handlers coming in call append, append, append, and then do a flush/sync of the edits they have
+ * appended the WAL before returning. Since sync takes a while to complete, we give the Handlers
+ * back this sync future to wait on until the actual HDFS sync completes. Meantime this sync future
+ * goes across a queue and is handled by a background thread; when it completes, it finishes up the
+ * future, the handler get or failed check completes and the Handler can then progress.
*
- * There is not a one-to-one correlation between dfs sync invocations and
- * instances of this class. A single dfs sync call may complete and mark many
- * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
- * call every time a Handler asks for it.
+ * This is just a partial implementation of Future; we just implement get and failure.
*
- * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even
- * if it the first time, start the sync, then park the 'hitched' thread on a call to
- * #get().
+ * There is not a one-to-one correlation between dfs sync invocations and instances of this class. A
+ * single dfs sync call may complete and mark many SyncFutures as done; i.e. we batch up sync calls
+ * rather than do a dfs sync call every time a Handler asks for it.
+ *
+ * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even if it the first
+ * time, start the sync, then park the 'hitched' thread on a call to #get().
*/
@InterfaceAudience.Private
class SyncFuture {
@@ -54,17 +49,17 @@ class SyncFuture {
private static final long NOT_DONE = 0;
/**
- * The sequence at which we were added to the ring buffer.
+ * The transaction id of this operation, monotonically increases.
*/
- private long ringBufferSequence;
+ private long txid;
/**
- * The sequence that was set in here when we were marked done. Should be equal
- * or > ringBufferSequence. Put this data member into the NOT_DONE state while this
- * class is in use. But for the first position on construction, let it be -1 so we can
- * immediately call {@link #reset(long, Span)} below and it will work.
+ * The transaction id that was set in here when we were marked done. Should be equal or > txnId.
+ * Put this data member into the NOT_DONE state while this class is in use. But for the first
+ * position on construction, let it be -1 so we can immediately call {@link #reset(long, Span)}
+ * below and it will work.
*/
- private long doneSequence = -1;
+ private long doneTxid = -1;
/**
* If error, the associated throwable. Set when the future is 'done'.
@@ -79,80 +74,83 @@ class SyncFuture {
private Span span;
/**
- * Call this method to clear old usage and get it ready for new deploy. Call
- * this method even if it is being used for the first time.
- *
- * @param sequence sequenceId from this Future's position in the RingBuffer
+ * Call this method to clear old usage and get it ready for new deploy. Call this method even if
+ * it is being used for the first time.
+ * @param txnId the new transaction id
* @return this
*/
- synchronized SyncFuture reset(final long sequence) {
- return reset(sequence, null);
+ synchronized SyncFuture reset(final long txnId) {
+ return reset(txnId, null);
}
/**
- * Call this method to clear old usage and get it ready for new deploy. Call
- * this method even if it is being used for the first time.
- *
+ * Call this method to clear old usage and get it ready for new deploy. Call this method even if
+ * it is being used for the first time.
* @param sequence sequenceId from this Future's position in the RingBuffer
- * @param span curren span, detached from caller. Don't forget to attach it when
- * resuming after a call to {@link #get()}.
+ * @param span curren span, detached from caller. Don't forget to attach it when resuming after a
+ * call to {@link #get()}.
* @return this
*/
- synchronized SyncFuture reset(final long sequence, Span span) {
- if (t != null && t != Thread.currentThread()) throw new IllegalStateException();
+ synchronized SyncFuture reset(final long txnId, Span span) {
+ if (t != null && t != Thread.currentThread()) {
+ throw new IllegalStateException();
+ }
t = Thread.currentThread();
- if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread());
- this.doneSequence = NOT_DONE;
- this.ringBufferSequence = sequence;
+ if (!isDone()) {
+ throw new IllegalStateException("" + txnId + " " + Thread.currentThread());
+ }
+ this.doneTxid = NOT_DONE;
+ this.txid = txnId;
this.span = span;
return this;
}
@Override
public synchronized String toString() {
- return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
+ return "done=" + isDone() + ", txid=" + this.txid;
}
- synchronized long getRingBufferSequence() {
- return this.ringBufferSequence;
+ synchronized long getTxid() {
+ return this.txid;
}
/**
- * Retrieve the {@code span} instance from this Future. EventHandler calls
- * this method to continue the span. Thread waiting on this Future musn't call
- * this method until AFTER calling {@link #get()} and the future has been
- * released back to the originating thread.
+ * Retrieve the {@code span} instance from this Future. EventHandler calls this method to continue
+ * the span. Thread waiting on this Future musn't call this method until AFTER calling
+ * {@link #get()} and the future has been released back to the originating thread.
*/
synchronized Span getSpan() {
return this.span;
}
/**
- * Used to re-attach a {@code span} to the Future. Called by the EventHandler
- * after a it has completed processing and detached the span from its scope.
+ * Used to re-attach a {@code span} to the Future. Called by the EventHandler after a it has
+ * completed processing and detached the span from its scope.
*/
synchronized void setSpan(Span span) {
this.span = span;
}
/**
- * @param sequence Sync sequence at which this future 'completed'.
- * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
- * @return True if we successfully marked this outstanding future as completed/done.
- * Returns false if this future is already 'done' when this method called.
+ * @param txid the transaction id at which this future 'completed'.
+ * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
+ * @return True if we successfully marked this outstanding future as completed/done. Returns false
+ * if this future is already 'done' when this method called.
*/
- synchronized boolean done(final long sequence, final Throwable t) {
- if (isDone()) return false;
+ synchronized boolean done(final long txid, final Throwable t) {
+ if (isDone()) {
+ return false;
+ }
this.throwable = t;
- if (sequence < this.ringBufferSequence) {
+ if (txid < this.txid) {
// Something badly wrong.
if (throwable == null) {
- this.throwable = new IllegalStateException("sequence=" + sequence +
- ", ringBufferSequence=" + this.ringBufferSequence);
+ this.throwable =
+ new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid);
}
}
// Mark done.
- this.doneSequence = sequence;
+ this.doneTxid = txid;
// Wake up waiting threads.
notify();
return true;
@@ -166,21 +164,14 @@ class SyncFuture {
while (!isDone()) {
wait(1000);
}
- if (this.throwable != null) throw new ExecutionException(this.throwable);
- return this.doneSequence;
- }
-
- public Long get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException {
- throw new UnsupportedOperationException();
- }
-
- public boolean isCancelled() {
- throw new UnsupportedOperationException();
+ if (this.throwable != null) {
+ throw new ExecutionException(this.throwable);
+ }
+ return this.doneTxid;
}
synchronized boolean isDone() {
- return this.doneSequence != NOT_DONE;
+ return this.doneTxid != NOT_DONE;
}
synchronized boolean isThrowable() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
deleted file mode 100644
index 8188e02c6dc..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.util.LRUDictionary;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-
-/**
- * Context used by our wal dictionary compressor. Null if we're not to do our
- * custom dictionary compression.
- */
-@InterfaceAudience.Private
-public abstract class WriterBase implements DefaultWALProvider.Writer {
-
- protected CompressionContext compressionContext;
- protected Configuration conf;
-
- @Override
- public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
- this.conf = conf;
- }
-
- public boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
- boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
- if (doCompress) {
- try {
- this.compressionContext = new CompressionContext(LRUDictionary.class,
- FSUtils.isRecoveredEdits(path), conf.getBoolean(
- CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
- } catch (Exception e) {
- throw new IOException("Failed to initiate CompressionContext", e);
- }
- }
- return doCompress;
- }
-
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
index ea71701a7de..32fe48b8958 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
@@ -417,7 +418,6 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
public boolean progress() {
return DFS_CLIENT_ADAPTOR.isClientRunning(client);
}
-
}
static {
@@ -579,6 +579,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
return futureList;
}
+ /**
+ * Exception other than RemoteException thrown when calling create on namenode
+ */
+ public static class NameNodeException extends IOException {
+
+ private static final long serialVersionUID = 3143237406477095390L;
+
+ public NameNodeException(Throwable cause) {
+ super(cause);
+ }
+ }
+
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop) throws IOException {
@@ -587,11 +599,20 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
ClientProtocol namenode = client.getNamenode();
- HdfsFileStatus stat = FILE_CREATER.create(namenode, src,
- FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
- new EnumSetWritablenextWriter
.
+ *
+ *
+ * @param oldPath may be null
+ * @param newPath may be null
+ * @param nextWriter may be null
+ * @return the passed in newPath
+ * @throws IOException if there is a problem flushing or closing the underlying FS
+ */
+ Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
+ TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
+ try {
+ long oldFileLen = 0L;
+ doReplaceWriter(oldPath, newPath, nextWriter);
+ int oldNumEntries = this.numEntries.get();
+ final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
+ if (oldPath != null) {
+ this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
+ this.totalLogSize.addAndGet(oldFileLen);
+ LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
+ + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
+ } else {
+ LOG.info("New WAL " + newPathString);
+ }
+ return newPath;
+ } finally {
+ scope.close();
+ }
+ }
+
+ protected Span blockOnSync(final SyncFuture syncFuture) throws IOException {
+ // Now we have published the ringbuffer, halt the current thread until we get an answer back.
+ try {
+ syncFuture.get();
+ return syncFuture.getSpan();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted", ie);
+ throw convertInterruptedExceptionToIOException(ie);
+ } catch (ExecutionException e) {
+ throw ensureIOException(e.getCause());
+ }
+ }
+
+ private static IOException ensureIOException(final Throwable t) {
+ return (t instanceof IOException) ? (IOException) t : new IOException(t);
+ }
+
+ private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ IOException ioe = new InterruptedIOException();
+ ioe.initCause(ie);
+ return ioe;
+ }
+
+ @Override
+ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
+ rollWriterLock.lock();
+ try {
+ // Return if nothing to flush.
+ if (!force && this.writer != null && this.numEntries.get() <= 0) {
+ return null;
+ }
+ byte[][] regionsToFlush = null;
+ if (this.closed) {
+ LOG.debug("WAL closed. Skipping rolling of writer");
+ return regionsToFlush;
+ }
+ if (!closeBarrier.beginOp()) {
+ LOG.debug("WAL closing. Skipping rolling of writer");
+ return regionsToFlush;
+ }
+ TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
+ try {
+ Path oldPath = getOldPath();
+ Path newPath = getNewPath();
+ // Any exception from here on is catastrophic, non-recoverable so we currently abort.
+ W nextWriter = this.createWriterInstance(newPath);
+ tellListenersAboutPreLogRoll(oldPath, newPath);
+ // NewPath could be equal to oldPath if replaceWriter fails.
+ newPath = replaceWriter(oldPath, newPath, nextWriter);
+ tellListenersAboutPostLogRoll(oldPath, newPath);
+ // Can we delete any of the old log files?
+ if (getNumRolledLogFiles() > 0) {
+ cleanOldLogs();
+ regionsToFlush = findRegionsToForceFlush();
+ }
+ } finally {
+ closeBarrier.endOp();
+ assert scope == NullScope.INSTANCE || !scope.isDetached();
+ scope.close();
+ }
+ return regionsToFlush;
+ } finally {
+ rollWriterLock.unlock();
+ }
+ }
+
+ // public only until class moves to o.a.h.h.wal
+ /** @return the size of log files in use */
+ public long getLogFileSize() {
+ return this.totalLogSize.get();
+ }
+
+ // public only until class moves to o.a.h.h.wal
+ public void requestLogRoll() {
+ requestLogRoll(false);
+ }
+
+ /**
+ * Get the backing files associated with this WAL.
+ * @return may be null if there are no files.
+ */
+ protected FileStatus[] getFiles() throws IOException {
+ return FSUtils.listStatus(fs, walDir, ourFiles);
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ if (!shutdown.compareAndSet(false, true)) {
+ return;
+ }
+ closed = true;
+ try {
+ // Prevent all further flushing and rolling.
+ closeBarrier.stopAndDrainOps();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while waiting for cache flushes and log rolls", e);
+ Thread.currentThread().interrupt();
+ }
+ // Tell our listeners that the log is closing
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.logCloseRequested();
+ }
+ }
+ doShutdown();
+ }
+
+ @Override
+ public void close() throws IOException {
+ shutdown();
+ final FileStatus[] files = getFiles();
+ if (null != files && 0 != files.length) {
+ for (FileStatus file : files) {
+ Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
+ // Tell our listeners that a log is going to be archived.
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.preLogArchive(file.getPath(), p);
+ }
+ }
+
+ if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
+ throw new IOException("Unable to rename " + file.getPath() + " to " + p);
+ }
+ // Tell our listeners that a log was archived.
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.postLogArchive(file.getPath(), p);
+ }
+ }
+ }
+ LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir));
+ }
+ LOG.info("Closed WAL: " + toString());
+ }
+
+ protected SyncFuture getSyncFuture(final long sequence, Span span) {
+ SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
+ if (syncFuture == null) {
+ syncFuture = new SyncFuture();
+ this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
+ }
+ return syncFuture.reset(sequence, span);
+ }
+
+ protected void requestLogRoll(boolean tooFewReplicas) {
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.logRollRequested(tooFewReplicas);
+ }
+ }
+ }
+
+ long getUnflushedEntriesCount() {
+ long highestSynced = this.highestSyncedTxid.get();
+ long highestUnsynced = this.highestUnsyncedTxid;
+ return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
+ }
+
+ boolean isUnflushedEntries() {
+ return getUnflushedEntriesCount() > 0;
+ }
+
+ /**
+ * Exposed for testing only. Use to tricks like halt the ring buffer appending.
+ */
+ @VisibleForTesting
+ void atHeadOfRingBufferEventHandlerAppend() {
+ // Noop
+ }
+
+ protected boolean append(W writer, FSWALEntry entry) throws IOException {
+ // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
+ atHeadOfRingBufferEventHandlerAppend();
+ long start = EnvironmentEdgeManager.currentTime();
+ byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
+ long regionSequenceId = WALKey.NO_SEQUENCE_ID;
+ // We are about to append this edit; update the region-scoped sequence number. Do it
+ // here inside this single appending/writing thread. Events are ordered on the ringbuffer
+ // so region sequenceids will also be in order.
+ regionSequenceId = entry.stampRegionSequenceId();
+ // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
+ // region sequence id only, a region edit/sequence id that is not associated with an actual
+ // edit. It has to go through all the rigmarole to be sure we have the right ordering.
+ if (entry.getEdit().isEmpty()) {
+ return false;
+ }
+
+ // Coprocessor hook.
+ if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) {
+ if (entry.getEdit().isReplay()) {
+ // Set replication scope null so that this won't be replicated
+ entry.getKey().serializeReplicationScope(false);
+ }
+ }
+ if (!listeners.isEmpty()) {
+ for (WALActionsListener i : listeners) {
+ i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
+ }
+ }
+ doAppend(writer, entry);
+ assert highestUnsyncedTxid < entry.getTxid();
+ highestUnsyncedTxid = entry.getTxid();
+ sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
+ entry.isInMemstore());
+ coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
+ // Update metrics.
+ postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
+ numEntries.incrementAndGet();
+ return true;
+ }
+
+ private long postAppend(final Entry e, final long elapsedTime) {
+ long len = 0;
+ if (!listeners.isEmpty()) {
+ for (Cell cell : e.getEdit().getCells()) {
+ len += CellUtil.estimatedSerializedSizeOf(cell);
+ }
+ for (WALActionsListener listener : listeners) {
+ listener.postAppend(len, elapsedTime);
+ }
+ }
+ return len;
+ }
+
+ protected void postSync(final long timeInNanos, final int handlerSyncs) {
+ if (timeInNanos > this.slowSyncNs) {
+ String msg =
+ new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
+ .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
+ Trace.addTimelineAnnotation(msg);
+ LOG.info(msg);
+ }
+ if (!listeners.isEmpty()) {
+ for (WALActionsListener listener : listeners) {
+ listener.postSync(timeInNanos, handlerSyncs);
+ }
+ }
+ }
+
+ /**
+ * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
+ * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
+ * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
+ * 'complete' the transaction this mvcc transaction by calling
+ * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
+ * in the finally of a try/finally block within which this append lives and any subsequent
+ * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
+ * passed in WALKey walKey
parameter. Be warned that the WriteEntry is not
+ * immediately available on return from this method. It WILL be available subsequent to a sync of
+ * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
+ */
+ @Override
+ public abstract long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore)
+ throws IOException;
+
+ protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
+
+ protected abstract W createWriterInstance(Path path) throws IOException;
+
+ /**
+ * @return old wal file size
+ */
+ protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
+ throws IOException;
+
+ protected abstract void doShutdown() throws IOException;
+
+ /**
+ * This method gets the pipeline for the current WAL.
+ */
+ @VisibleForTesting
+ abstract DatanodeInfo[] getPipeline();
+
+ /**
+ * This method gets the datanode replication count for the current WAL.
+ */
+ @VisibleForTesting
+ abstract int getLogReplication();
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
new file mode 100644
index 00000000000..66f1f54bc26
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * Base class for Protobuf log writer.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public abstract class AbstractProtobufLogWriter {
+
+ private static final Log LOG = LogFactory.getLog(AbstractProtobufLogWriter.class);
+
+ protected CompressionContext compressionContext;
+ protected Configuration conf;
+ protected Codec.Encoder cellEncoder;
+ protected WALCellCodec.ByteStringCompressor compressor;
+ protected boolean trailerWritten;
+ protected WALTrailer trailer;
+ // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
+ // than this size, it is written/read respectively, with a WARN message in the log.
+ protected int trailerWarnSize;
+
+ protected AtomicLong length = new AtomicLong();
+
+ private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
+ throws IOException {
+ return WALCellCodec.create(conf, null, compressionContext);
+ }
+
+ protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
+ throws IOException {
+ if (!builder.hasWriterClsName()) {
+ builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
+ }
+ if (!builder.hasCellCodecClsName()) {
+ builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf));
+ }
+ return builder.build();
+ }
+
+ private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
+ boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
+ if (doCompress) {
+ try {
+ this.compressionContext = new CompressionContext(LRUDictionary.class,
+ FSUtils.isRecoveredEdits(path),
+ conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
+ } catch (Exception e) {
+ throw new IOException("Failed to initiate CompressionContext", e);
+ }
+ }
+ return doCompress;
+ }
+
+ public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
+ throws IOException {
+ this.conf = conf;
+ boolean doCompress = initializeCompressionContext(conf, path);
+ this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
+ int bufferSize = FSUtils.getDefaultBufferSize(fs);
+ short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
+ FSUtils.getDefaultReplication(fs, path));
+ long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
+ FSUtils.getDefaultBlockSize(fs, path));
+
+ initOutput(fs, path, overwritable, bufferSize, replication, blockSize);
+
+ boolean doTagCompress = doCompress
+ && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
+ length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
+ WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
+
+ initAfterHeader(doCompress);
+
+ // instantiate trailer to default value.
+ trailer = WALTrailer.newBuilder().build();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
+ }
+ }
+
+ protected void initAfterHeader(boolean doCompress) throws IOException {
+ WALCellCodec codec = getCodec(conf, this.compressionContext);
+ this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
+ if (doCompress) {
+ this.compressor = codec.getByteStringCompressor();
+ }
+ }
+
+ void setWALTrailer(WALTrailer walTrailer) {
+ this.trailer = walTrailer;
+ }
+
+ public long getLength() {
+ return length.get();
+ }
+
+ private WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
+ return builder.build();
+ }
+
+ protected void writeWALTrailer() {
+ try {
+ int trailerSize = 0;
+ if (this.trailer == null) {
+ // use default trailer.
+ LOG.warn("WALTrailer is null. Continuing with default.");
+ this.trailer = buildWALTrailer(WALTrailer.newBuilder());
+ trailerSize = this.trailer.getSerializedSize();
+ } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
+ // continue writing after warning the user.
+ LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
+ + " > " + this.trailerWarnSize);
+ }
+ length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
+ this.trailerWritten = true;
+ } catch (IOException ioe) {
+ LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
+ }
+ }
+
+ protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
+ short replication, long blockSize) throws IOException;
+
+ /**
+ * return the file length after written.
+ */
+ protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException;
+
+ protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic)
+ throws IOException;
+
+ protected abstract OutputStream getOutputStreamForCellEncoder();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
new file mode 100644
index 00000000000..b80f2c9f33c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -0,0 +1,732 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT;
+import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
+import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.channels.CompletionHandler;
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput;
+import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.htrace.NullScope;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * An asynchronous implementation of FSWAL.
+ *
+ *
+ * For sync, the processing stages are almost same except that if it is not assigned with a new
+ * 'txid', we just assign the previous 'txid' to it without bumping the 'nextTxid'. And different
+ * from FSHLog, we will open a new writer and rewrite unacked entries to the new writer and sync
+ * again if we hit a sync error.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * For a normal roll request(for example, we have reached the log roll size):
+ *
+ *
+ * For a broken writer roll request, the only difference is that we can bypass the wait for safe
+ * point stage. See the comments in the 'failed' method of the sync CompletionHandler for more
+ * details.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class AsyncFSWAL extends AbstractFSWAL
+ *
+ * F
when all of the edits in F
have a log-sequence-id that's older
- * (smaller) than the most-recent flush.
- *
- * Failure Semantic
- * If an exception on append or sync, roll the WAL because the current WAL is now a lame duck;
- * any more appends or syncs will fail also with the same original exception. If we have made
- * successful appends to the WAL and we then are unable to sync them, our current semantic is to
- * return error to the client that the appends failed but also to abort the current context,
- * usually the hosting server. We need to replay the WALs. TODO: Change this semantic. A roll of
- * WAL may be sufficient as long as we have flagged client that the append failed. TODO:
- * replication may pick up these last edits though they have been marked as failed append (Need to
- * keep our own file lengths, not rely on HDFS).
+ * The default implementation of FSWAL.
*/
@InterfaceAudience.Private
-public class FSHLog implements WAL {
+public class FSHLog extends AbstractFSWALdir
location.
- *
- * You should never have to load an existing log. If there is a log at
- * startup, it should have already been processed and deleted by the time the
- * WAL object is started up.
- *
+ * Create an edit log at the given dir
location. You should never have to load an
+ * existing log. If there is a log at startup, it should have already been processed and deleted
+ * by the time the WAL object is started up.
* @param fs filesystem handle
* @param rootDir path to where logs and oldlogs
* @param logDir dir where wals are stored
* @param archiveDir dir where wals are archived
* @param conf configuration to use
- * @param listeners Listeners on WAL events. Listeners passed here will
- * be registered before we do anything else; e.g. the
- * Constructor {@link #rollWriter()}.
- * @param failIfWALExists If true IOException will be thrown if files related to this wal
- * already exist.
- * @param prefix should always be hostname and port in distributed env and
- * it will be URL encoded before being used.
- * If prefix is null, "wal" will be used
+ * @param listeners Listeners on WAL events. Listeners passed here will be registered before we do
+ * anything else; e.g. the Constructor {@link #rollWriter()}.
+ * @param failIfWALExists If true IOException will be thrown if files related to this wal already
+ * exist.
+ * @param prefix should always be hostname and port in distributed env and it will be URL encoded
+ * before being used. If prefix is null, "wal" will be used
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
- * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
- * @throws IOException
+ * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
*/
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
- final String archiveDir, final Configuration conf,
- final ListnextWriter
.
- *
- * In the case of creating a new WAL, oldPath will be null.
- *
- * In the case of rolling over from one file to the next, none of the params will be null.
- *
- * In the case of closing out this FSHLog with no further use newPath, nextWriter, and
- * nextHdfsOut will be null.
- *
- * @param oldPath may be null
- * @param newPath may be null
- * @param nextWriter may be null
- * @param nextHdfsOut may be null
- * @return the passed in newPath
- * @throws IOException if there is a problem flushing or closing the underlying FS
- */
- Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
- final FSDataOutputStream nextHdfsOut)
- throws IOException {
- // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
+ @Override
+ protected void doAppend(Writer writer, FSWALEntry entry) throws IOException {
+ writer.append(entry);
+ }
+
+ @Override
+ protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
+ // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
// thread will eventually pause. An error hereafter needs to release the writer thread
- // regardless -- hence the finally block below. Note, this method is called from the FSHLog
+ // regardless -- hence the finally block below. Note, this method is called from the FSHLog
// constructor BEFORE the ring buffer is set running so it is null on first time through
// here; allow for that.
SyncFuture syncFuture = null;
- SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
- null: this.ringBufferEventHandler.attainSafePoint();
+ SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null) ? null
+ : this.ringBufferEventHandler.attainSafePoint();
afterCreatingZigZagLatch();
- TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
+ long oldFileLen = 0L;
try {
- // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
+ // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
// ring buffer between the above notification of writer that we want it to go to
- // 'safe point' and then here where we are waiting on it to attain safe point. Use
+ // 'safe point' and then here where we are waiting on it to attain safe point. Use
// 'sendSync' instead of 'sync' because we do not want this thread to block waiting on it
- // to come back. Cleanup this syncFuture down below after we are ready to run again.
+ // to come back. Cleanup this syncFuture down below after we are ready to run again.
try {
if (zigzagLatch != null) {
Trace.addTimelineAnnotation("awaiting safepoint");
@@ -833,44 +340,37 @@ public class FSHLog implements WAL {
}
} catch (FailedSyncBeforeLogCloseException e) {
// If unflushed/unsynced entries on close, it is reason to abort.
- if (isUnflushedEntries()) throw e;
- LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
- e.getMessage());
+ if (isUnflushedEntries()) {
+ throw e;
+ }
+ LOG.warn(
+ "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
}
-
- // It is at the safe point. Swap out writer from under the blocked writer thread.
- // TODO: This is close is inline with critical section. Should happen in background?
- try {
- if (this.writer != null) {
+ // It is at the safe point. Swap out writer from under the blocked writer thread.
+ // TODO: This is close is inline with critical section. Should happen in background?
+ if (this.writer != null) {
+ oldFileLen = this.writer.getLength();
+ try {
Trace.addTimelineAnnotation("closing writer");
this.writer.close();
Trace.addTimelineAnnotation("writer closed");
- }
- this.closeErrorCount.set(0);
- } catch (IOException ioe) {
- int errors = closeErrorCount.incrementAndGet();
- if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
- LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
- ioe.getMessage() + "\", errors=" + errors +
- "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
- } else {
- throw ioe;
+ this.closeErrorCount.set(0);
+ } catch (IOException ioe) {
+ int errors = closeErrorCount.incrementAndGet();
+ if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
+ LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage()
+ + "\", errors=" + errors
+ + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
+ } else {
+ throw ioe;
+ }
}
}
this.writer = nextWriter;
- this.hdfs_out = nextHdfsOut;
- int oldNumEntries = this.numEntries.get();
- this.numEntries.set(0);
- final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
- if (oldPath != null) {
- this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
- long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
- this.totalLogSize.addAndGet(oldFileLen);
- LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
- ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
- newPathString);
+ if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
+ this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
} else {
- LOG.info("New WAL " + newPathString);
+ this.hdfs_out = null;
}
} catch (InterruptedException ie) {
// Perpetuate the interrupt
@@ -880,223 +380,84 @@ public class FSHLog implements WAL {
LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
} finally {
- try {
- // Let the writer thread go regardless, whether error or not.
- if (zigzagLatch != null) {
- zigzagLatch.releaseSafePoint();
- // syncFuture will be null if we failed our wait on safe point above. Otherwise, if
- // latch was obtained successfully, the sync we threw in either trigger the latch or it
- // got stamped with an exception because the WAL was damaged and we could not sync. Now
- // the write pipeline has been opened up again by releasing the safe point, process the
- // syncFuture we got above. This is probably a noop but it may be stale exception from
- // when old WAL was in place. Catch it if so.
- if (syncFuture != null) {
- try {
- blockOnSync(syncFuture);
- } catch (IOException ioe) {
- if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe);
+ // Let the writer thread go regardless, whether error or not.
+ if (zigzagLatch != null) {
+ zigzagLatch.releaseSafePoint();
+ // syncFuture will be null if we failed our wait on safe point above. Otherwise, if
+ // latch was obtained successfully, the sync we threw in either trigger the latch or it
+ // got stamped with an exception because the WAL was damaged and we could not sync. Now
+ // the write pipeline has been opened up again by releasing the safe point, process the
+ // syncFuture we got above. This is probably a noop but it may be stale exception from
+ // when old WAL was in place. Catch it if so.
+ if (syncFuture != null) {
+ try {
+ blockOnSync(syncFuture);
+ } catch (IOException ioe) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stale sync exception", ioe);
}
}
}
- } finally {
- scope.close();
}
}
- return newPath;
+ return oldFileLen;
}
- long getUnflushedEntriesCount() {
- long highestSynced = this.highestSyncedSequence.get();
- return highestSynced > this.highestUnsyncedSequence?
- 0: this.highestUnsyncedSequence - highestSynced;
- }
-
- boolean isUnflushedEntries() {
- return getUnflushedEntriesCount() > 0;
- }
-
- /*
- * only public so WALSplitter can use.
- * @return archived location of a WAL file with the given path p
- */
- public static Path getWALArchivePath(Path archiveDir, Path p) {
- return new Path(archiveDir, p.getName());
- }
-
- private void archiveLogFile(final Path p) throws IOException {
- Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
- // Tell our listeners that a log is going to be archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogArchive(p, newPath);
+ @Override
+ protected void doShutdown() throws IOException {
+ // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we
+ // have stopped incoming appends before calling this else it will not shutdown. We are
+ // conservative below waiting a long time and if not elapsed, then halting.
+ if (this.disruptor != null) {
+ long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
+ try {
+ this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt "
+ + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
+ this.disruptor.halt();
+ this.disruptor.shutdown();
}
}
- LOG.info("Archiving " + p + " to " + newPath);
- if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
- throw new IOException("Unable to rename " + p + " to " + newPath);
+ // With disruptor down, this is safe to let go.
+ if (this.appendExecutor != null) {
+ this.appendExecutor.shutdown();
}
- // Tell our listeners that a log has been archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogArchive(p, newPath);
- }
- }
- }
- /**
- * This is a convenience method that computes a new filename with a given
- * file-number.
- * @param filenum to use
- * @return Path
- */
- protected Path computeFilename(final long filenum) {
- if (filenum < 0) {
- throw new RuntimeException("WAL file number can't be < 0");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing WAL writer in " + FSUtils.getPath(walDir));
+ }
+ if (this.writer != null) {
+ this.writer.close();
+ this.writer = null;
}
- String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
- return new Path(fullPathLogDir, child);
- }
-
- /**
- * This is a convenience method that computes a new filename with a given
- * using the current WAL file-number
- * @return Path
- */
- public Path getCurrentFileName() {
- return computeFilename(this.filenum.get());
}
@Override
public String toString() {
- return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
+ return "FSHLog " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
}
-/**
- * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}.
- * This helper method returns the creation timestamp from a given log file.
- * It extracts the timestamp assuming the filename is created with the
- * {@link #computeFilename(long filenum)} method.
- * @param fileName
- * @return timestamp, as in the log file name.
- */
- protected long getFileNumFromFileName(Path fileName) {
- if (fileName == null) throw new IllegalArgumentException("file name can't be null");
- if (!ourFiles.accept(fileName)) {
- throw new IllegalArgumentException("The log file " + fileName +
- " doesn't belong to this WAL. (" + toString() + ")");
- }
- final String fileNameString = fileName.toString();
- String chompedPath = fileNameString.substring(prefixPathStr.length(),
- (fileNameString.length() - logFileSuffix.length()));
- return Long.parseLong(chompedPath);
- }
-
- @Override
- public void close() throws IOException {
- shutdown();
- final FileStatus[] files = getFiles();
- if (null != files && 0 != files.length) {
- for (FileStatus file : files) {
- Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
- // Tell our listeners that a log is going to be archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogArchive(file.getPath(), p);
- }
- }
-
- if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
- throw new IOException("Unable to rename " + file.getPath() + " to " + p);
- }
- // Tell our listeners that a log was archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogArchive(file.getPath(), p);
- }
- }
- }
- LOG.debug("Moved " + files.length + " WAL file(s) to " +
- FSUtils.getPath(this.fullPathArchiveDir));
- }
- LOG.info("Closed WAL: " + toString());
- }
-
- @Override
- public void shutdown() throws IOException {
- if (shutdown.compareAndSet(false, true)) {
- try {
- // Prevent all further flushing and rolling.
- closeBarrier.stopAndDrainOps();
- } catch (InterruptedException e) {
- LOG.error("Exception while waiting for cache flushes and log rolls", e);
- Thread.currentThread().interrupt();
- }
-
- // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we
- // have stopped incoming appends before calling this else it will not shutdown. We are
- // conservative below waiting a long time and if not elapsed, then halting.
- if (this.disruptor != null) {
- long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
- try {
- this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
- "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
- this.disruptor.halt();
- this.disruptor.shutdown();
- }
- }
- // With disruptor down, this is safe to let go.
- if (this.appendExecutor != null) this.appendExecutor.shutdown();
-
- // Tell our listeners that the log is closing
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.logCloseRequested();
- }
- }
- this.closed = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
- }
- if (this.writer != null) {
- this.writer.close();
- this.writer = null;
- }
- }
- }
-
- /**
- * NOTE: This append, at a time that is usually after this call returns, starts an
- * mvcc transaction by calling 'begin' wherein which we assign this update a sequenceid. At
- * assignment time, we stamp all the passed in Cells inside WALEdit with their sequenceId.
- * You must 'complete' the transaction this mvcc transaction by calling
- * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
- * in the finally of a try/finally
- * block within which this append lives and any subsequent operations like sync or
- * update of memstore, etc. Get the WriteEntry to pass mvcc out of the passed in WALKey
- * walKey
parameter. Be warned that the WriteEntry is not immediately available
- * on return from this method. It WILL be available subsequent to a sync of this append;
- * otherwise, you will just have to wait on the WriteEntry to get filled in.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
- justification="Will never be null")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
+ justification = "Will never be null")
@Override
public long append(final HRegionInfo hri,
final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException {
- if (this.closed) throw new IOException("Cannot append; log is closed");
- // Make a trace scope for the append. It is closed on other side of the ring buffer by the
- // single consuming thread. Don't have to worry about it.
+ if (this.closed) {
+ throw new IOException("Cannot append; log is closed");
+ }
+ // Make a trace scope for the append. It is closed on other side of the ring buffer by the
+ // single consuming thread. Don't have to worry about it.
TraceScope scope = Trace.startSpan("FSHLog.append");
- // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
+ // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
// all this to make a key and then below to append the edit, we need to carry htd, info,
// etc. all over the ring buffer.
FSWALEntry entry = null;
long sequence = this.disruptor.getRingBuffer().next();
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
- // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
+ // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
// edit with its edit/sequence id.
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
@@ -1108,17 +469,18 @@ public class FSHLog implements WAL {
}
/**
- * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest
- * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run
- * multiple threads sync'ng rather than one that just syncs in series so we have better
- * latencies; otherwise, an edit that arrived just after a sync started, might have to wait
- * almost the length of two sync invocations before it is marked done.
- * syncs
- * futures will return the exception to their clients; some of the edits may have made it out
- * to data nodes but we will report all that were part of this session as failed.
+ * we will put the result of the actual hdfs sync call as the result.
+ * @param sequence The sequence number on the ring buffer when this thread was set running. If
+ * this actual writer sync completes then all appends up this point have been
+ * flushed/synced/pushed to datanodes. If we fail, then the passed in
+ * syncs
futures will return the exception to their clients; some of the
+ * edits may have made it out to data nodes but we will report all that were part of
+ * this session as failed.
*/
SyncRunner(final String name, final int maxHandlersCount) {
super(name);
@@ -1145,17 +508,17 @@ public class FSHLog implements WAL {
//
// We could let the capacity be 'open' but bound it so we get alerted in pathological case
// where we cannot sync and we have a bunch of threads all backed up waiting on their syncs
- // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should
- // stay neat and tidy in usual case. Let the max size be three times the maximum handlers.
+ // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should
+ // stay neat and tidy in usual case. Let the max size be three times the maximum handlers.
// The passed in maxHandlerCount is the user-level handlers which is what we put up most of
// but HBase has other handlers running too -- opening region handlers which want to write
- // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually
+ // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually
// much fewer in number than the user-space handlers so Q-size should be user handlers plus
- // some space for these other handlers. Lets multiply by 3 for good-measure.
+ // some space for these other handlers. Lets multiply by 3 for good-measure.
this.syncFutures = new LinkedBlockingQueuesyncFuture
- * @param syncFuture
- * @param currentSequence
- * @param t
* @return Returns 1.
*/
private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
final Throwable t) {
- if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
+ if (!syncFuture.done(currentSequence, t)) {
+ throw new IllegalStateException();
+ }
// This function releases one sync future only.
return 1;
}
/**
* Release all SyncFutures whose sequence is <= currentSequence
.
- * @param currentSequence
* @param t May be non-null if we are processing SyncFutures because an exception was thrown.
* @return Count of SyncFutures we let go.
*/
private int releaseSyncFutures(final long currentSequence, final Throwable t) {
int syncCount = 0;
for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
- if (syncFuture.getRingBufferSequence() > currentSequence) break;
+ if (syncFuture.getTxid() > currentSequence) {
+ break;
+ }
releaseSyncFuture(syncFuture, currentSequence, t);
if (!this.syncFutures.remove(syncFuture)) {
throw new IllegalStateException(syncFuture.toString());
@@ -1204,14 +567,14 @@ public class FSHLog implements WAL {
long currentHighestSyncedSequence;
// Set the highestSyncedSequence IFF our current sequence id is the 'highest'.
do {
- currentHighestSyncedSequence = highestSyncedSequence.get();
+ currentHighestSyncedSequence = highestSyncedTxid.get();
if (currentHighestSyncedSequence >= sequence) {
// Set the sync number to current highwater mark; might be able to let go more
// queued sync futures
sequence = currentHighestSyncedSequence;
break;
}
- } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
+ } while (!highestSyncedTxid.compareAndSet(currentHighestSyncedSequence, sequence));
return sequence;
}
@@ -1225,21 +588,21 @@ public class FSHLog implements WAL {
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
- long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
+ long syncFutureSequence = takeSyncFuture.getTxid();
if (syncFutureSequence > currentSequence) {
- throw new IllegalStateException("currentSequence=" + syncFutureSequence +
- ", syncFutureSequence=" + syncFutureSequence);
+ throw new IllegalStateException("currentSequence=" + syncFutureSequence
+ + ", syncFutureSequence=" + syncFutureSequence);
}
// See if we can process any syncfutures BEFORE we go sync.
- long currentHighestSyncedSequence = highestSyncedSequence.get();
+ long currentHighestSyncedSequence = highestSyncedTxid.get();
if (currentSequence < currentHighestSyncedSequence) {
syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
- // Done with the 'take'. Go around again and do a new 'take'.
+ // Done with the 'take'. Go around again and do a new 'take'.
continue;
}
break;
}
- // I got something. Lets run. Save off current sequence number in case it changes
+ // I got something. Lets run. Save off current sequence number in case it changes
// while we run.
TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
long start = System.nanoTime();
@@ -1262,8 +625,11 @@ public class FSHLog implements WAL {
syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
// Can we release other syncs?
syncCount += releaseSyncFutures(currentSequence, lastException);
- if (lastException != null) requestLogRoll();
- else checkLogRoll();
+ if (lastException != null) {
+ requestLogRoll();
+ } else {
+ checkLogRoll();
+ }
}
postSync(System.nanoTime() - start, syncCount);
} catch (InterruptedException e) {
@@ -1281,7 +647,9 @@ public class FSHLog implements WAL {
*/
void checkLogRoll() {
// Will return immediately if we are in the middle of a WAL log roll currently.
- if (!rollWriterLock.tryLock()) return;
+ if (!rollWriterLock.tryLock()) {
+ return;
+ }
boolean lowReplication;
try {
lowReplication = checkLowReplication();
@@ -1297,7 +665,7 @@ public class FSHLog implements WAL {
}
}
- /*
+ /**
* @return true if number of replicas for the WAL is lower than threshold
*/
private boolean checkLowReplication() {
@@ -1309,11 +677,10 @@ public class FSHLog implements WAL {
if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
if (this.lowReplicationRollEnabled) {
if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
- LOG.warn("HDFS pipeline error detected. " + "Found "
- + numCurrentReplicas + " replicas but expecting no less than "
- + this.minTolerableReplication + " replicas. "
- + " Requesting close of WAL. current pipeline: "
- + Arrays.toString(getPipeLine()));
+ LOG.warn("HDFS pipeline error detected. " + "Found " + numCurrentReplicas
+ + " replicas but expecting no less than " + this.minTolerableReplication
+ + " replicas. " + " Requesting close of WAL. current pipeline: "
+ + Arrays.toString(getPipeline()));
logRollNeeded = true;
// If rollWriter is requested, increase consecutiveLogRolls. Once it
// is larger than lowReplicationRollLimit, disable the
@@ -1341,8 +708,7 @@ public class FSHLog implements WAL {
}
}
} catch (Exception e) {
- LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
- ", continuing...");
+ LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + ", continuing...");
}
return logRollNeeded;
}
@@ -1353,6 +719,7 @@ public class FSHLog implements WAL {
private SyncFuture publishSyncOnRingBuffer(Span span) {
long sequence = this.disruptor.getRingBuffer().next();
+ // here we use ring buffer sequence as transaction id
SyncFuture syncFuture = getSyncFuture(sequence, span);
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
@@ -1368,81 +735,17 @@ public class FSHLog implements WAL {
return blockOnSync(publishSyncOnRingBuffer(span));
}
- private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
- // Now we have published the ringbuffer, halt the current thread until we get an answer back.
- try {
- syncFuture.get();
- return syncFuture.getSpan();
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted", ie);
- throw convertInterruptedExceptionToIOException(ie);
- } catch (ExecutionException e) {
- throw ensureIOException(e.getCause());
- }
- }
-
- private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
- Thread.currentThread().interrupt();
- IOException ioe = new InterruptedIOException();
- ioe.initCause(ie);
- return ioe;
- }
-
- private SyncFuture getSyncFuture(final long sequence, Span span) {
- SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
- if (syncFuture == null) {
- syncFuture = new SyncFuture();
- this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
- }
- return syncFuture.reset(sequence, span);
- }
-
- private void postSync(final long timeInNanos, final int handlerSyncs) {
- if (timeInNanos > this.slowSyncNs) {
- String msg =
- new StringBuilder().append("Slow sync cost: ")
- .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
- .append(Arrays.toString(getPipeLine())).toString();
- Trace.addTimelineAnnotation(msg);
- LOG.info(msg);
- }
- if (!listeners.isEmpty()) {
- for (WALActionsListener listener : listeners) {
- listener.postSync(timeInNanos, handlerSyncs);
- }
- }
- }
-
- private long postAppend(final Entry e, final long elapsedTime) {
- long len = 0;
- if (!listeners.isEmpty()) {
- for (Cell cell : e.getEdit().getCells()) {
- len += CellUtil.estimatedSerializedSizeOf(cell);
- }
- for (WALActionsListener listener : listeners) {
- listener.postAppend(len, elapsedTime);
- }
- }
- return len;
- }
-
-
/**
- * This method gets the datanode replication count for the current WAL.
- *
- * If the pipeline isn't started yet or is empty, you will get the default
- * replication factor. Therefore, if this function returns 0, it means you
- * are not properly running with the HDFS-826 patch.
- * @throws InvocationTargetException
- * @throws IllegalAccessException
- * @throws IllegalArgumentException
- *
- * @throws Exception
+ * {@inheritDoc}
+ * syncFuture
- * @throws FailedSyncBeforeLogCloseException
*/
- SyncFuture waitSafePoint(final SyncFuture syncFuture)
- throws InterruptedException, FailedSyncBeforeLogCloseException {
+ SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException,
+ FailedSyncBeforeLogCloseException {
while (true) {
- if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break;
+ if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) {
+ break;
+ }
if (syncFuture.isThrowable()) {
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
}
@@ -1633,10 +864,9 @@ public class FSHLog implements WAL {
}
/**
- * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
- * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
- * is called by Thread A.
- * @throws InterruptedException
+ * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread
+ * A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called
+ * by Thread A.
*/
void safePointAttained() throws InterruptedException {
this.safePointAttainedLatch.countDown();
@@ -1644,8 +874,8 @@ public class FSHLog implements WAL {
}
/**
- * Called by Thread A when it is done with the work it needs to do while Thread B is
- * halted. This will release the Thread B held in a call to {@link #safePointAttained()}
+ * Called by Thread A when it is done with the work it needs to do while Thread B is halted.
+ * This will release the Thread B held in a call to {@link #safePointAttained()}
*/
void releaseSafePoint() {
this.safePointReleasedLatch.countDown();
@@ -1655,44 +885,44 @@ public class FSHLog implements WAL {
* @return True is this is a 'cocked', fresh instance, and not one that has already fired.
*/
boolean isCocked() {
- return this.safePointAttainedLatch.getCount() > 0 &&
- this.safePointReleasedLatch.getCount() > 0;
+ return this.safePointAttainedLatch.getCount() > 0
+ && this.safePointReleasedLatch.getCount() > 0;
}
}
/**
* Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE
- * 'writer/appender' thread. Appends edits and starts up sync runs. Tries its best to batch up
- * syncs. There is no discernible benefit batching appends so we just append as they come in
- * because it simplifies the below implementation. See metrics for batching effectiveness
- * (In measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10
- * handler sync invocations for every actual dfsclient sync call; at 10 concurrent handlers,
- * YMMV).
- * stdout
or split the specified log files.
- *
- * @param args
- * @throws IOException
+ * Pass one or more log file names and it will either dump out a text version on
+ * stdout
or split the specified log files.
*/
public static void main(String[] args) throws IOException {
if (args.length < 2) {
@@ -1963,8 +1151,8 @@ public class FSHLog implements WAL {
WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
} else if (args[0].compareTo("--perf") == 0) {
LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
- LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
- args[1]);
+ LOG.fatal(
+ "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
System.exit(-1);
} else if (args[0].compareTo("--split") == 0) {
Configuration conf = HBaseConfiguration.create();
@@ -1987,8 +1175,8 @@ public class FSHLog implements WAL {
/**
* This method gets the pipeline for the current WAL.
*/
- @VisibleForTesting
- DatanodeInfo[] getPipeLine() {
+ @Override
+ DatanodeInfo[] getPipeline() {
if (this.hdfs_out != null) {
if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) {
return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 06318f068e5..24f195d9fe1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -48,17 +48,20 @@ import org.apache.hadoop.hbase.wal.WALKey;
class FSWALEntry extends Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
- private final transient long sequence;
+ private final transient long txid;
private final transient boolean inMemstore;
private final transient HRegionInfo hri;
- private final Set
+ *
+ * It also uses the providerId to differentiate among files.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractFSWALProvider.logs/1.example.org,60030,12345
if
+ * serverName
passed is 1.example.org,60030,12345
+ */
+ public static String getWALDirectoryName(final String serverName) {
+ StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
+ dirName.append("/");
+ dirName.append(serverName);
+ return dirName.toString();
+ }
+
+ /**
+ * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
+ * this method ignores the format of the logfile component. Current format: [base directory for
+ * hbase]/hbase/.logs/ServerName/logfile or [base directory for
+ * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
+ * server-specific directories.
+ * @return null if it's not a log file. Returns the ServerName of the region server that created
+ * this log file otherwise.
+ */
+ public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
+ throws IOException {
+ if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
+ return null;
+ }
+
+ if (conf == null) {
+ throw new IllegalArgumentException("parameter conf must be set");
+ }
+
+ final String rootDir = conf.get(HConstants.HBASE_DIR);
+ if (rootDir == null || rootDir.isEmpty()) {
+ throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
+ }
+
+ final StringBuilder startPathSB = new StringBuilder(rootDir);
+ if (!rootDir.endsWith("/")) {
+ startPathSB.append('/');
+ }
+ startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
+ if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
+ startPathSB.append('/');
+ }
+ final String startPath = startPathSB.toString();
+
+ String fullPath;
+ try {
+ fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
+ } catch (IllegalArgumentException e) {
+ LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
+ return null;
+ }
+
+ if (!fullPath.startsWith(startPath)) {
+ return null;
+ }
+
+ final String serverNameAndFile = fullPath.substring(startPath.length());
+
+ if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
+ // Either it's a file (not a directory) or it's not a ServerName format
+ return null;
+ }
+
+ Path p = new Path(path);
+ return getServerNameFromWALDirectoryName(p);
+ }
+
+ /**
+ * This function returns region server name from a log file name which is in one of the following
+ * formats:
+ *
+ *
+ * @return null if the passed in logFile isn't a valid WAL file path
+ */
+ public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
+ String logDirName = logFile.getParent().getName();
+ // We were passed the directory and not a file in it.
+ if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
+ logDirName = logFile.getName();
+ }
+ ServerName serverName = null;
+ if (logDirName.endsWith(SPLITTING_EXT)) {
+ logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
+ }
+ try {
+ serverName = ServerName.parseServerName(logDirName);
+ } catch (IllegalArgumentException ex) {
+ serverName = null;
+ LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
+ }
+ if (serverName != null && serverName.getStartcode() < 0) {
+ LOG.warn("Invalid log file path=" + logFile);
+ serverName = null;
+ }
+ return serverName;
+ }
+
+ public static boolean isMetaFile(Path p) {
+ return isMetaFile(p.getName());
+ }
+
+ public static boolean isMetaFile(String p) {
+ if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Get prefix of the log from its name, assuming WAL name in format of
+ * log_prefix.filenumber.log_suffix
+ * @param name Name of the WAL to parse
+ * @return prefix of the log
+ * @see AbstractFSWAL#getCurrentFileName()
+ */
+ public static String getWALPrefixFromWALName(String name) {
+ int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
+ return name.substring(0, endIndex);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
new file mode 100644
index 00000000000..bc142ce9639
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+/**
+ * A WAL provider that use {@link AsyncFSWAL}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AsyncFSWALProvider extends AbstractFSWALProvider
- *
- * It also uses the providerId to diffentiate among files.
- *
+ * A WAL provider that use {@link FSHLog}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class DefaultWALProvider implements WALProvider {
- private static final Log LOG = LogFactory.getLog(DefaultWALProvider.class);
+public class DefaultWALProvider extends AbstractFSWALProvider.logs/1.example.org,60030,12345
if
- * serverName
passed is
- * 1.example.org,60030,12345
- */
- public static String getWALDirectoryName(final String serverName) {
- StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
- dirName.append("/");
- dirName.append(serverName);
- return dirName.toString();
- }
-
- /**
- * Pulls a ServerName out of a Path generated according to our layout rules.
- *
- * In the below layouts, this method ignores the format of the logfile component.
- *
- * Current format:
- *
- * [base directory for hbase]/hbase/.logs/ServerName/logfile
- * or
- * [base directory for hbase]/hbase/.logs/ServerName-splitting/logfile
- *
- * Expected to work for individual log files and server-specific directories.
- *
- * @return null if it's not a log file. Returns the ServerName of the region
- * server that created this log file otherwise.
- */
- public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
- throws IOException {
- if (path == null
- || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
- return null;
- }
-
- if (conf == null) {
- throw new IllegalArgumentException("parameter conf must be set");
- }
-
- final String rootDir = conf.get(HConstants.HBASE_DIR);
- if (rootDir == null || rootDir.isEmpty()) {
- throw new IllegalArgumentException(HConstants.HBASE_DIR
- + " key not found in conf.");
- }
-
- final StringBuilder startPathSB = new StringBuilder(rootDir);
- if (!rootDir.endsWith("/"))
- startPathSB.append('/');
- startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
- if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
- startPathSB.append('/');
- final String startPath = startPathSB.toString();
-
- String fullPath;
- try {
- fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
- } catch (IllegalArgumentException e) {
- LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
- return null;
- }
-
- if (!fullPath.startsWith(startPath)) {
- return null;
- }
-
- final String serverNameAndFile = fullPath.substring(startPath.length());
-
- if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
- // Either it's a file (not a directory) or it's not a ServerName format
- return null;
- }
-
- Path p = new Path(path);
- return getServerNameFromWALDirectoryName(p);
- }
-
- /**
- * This function returns region server name from a log file name which is in one of the following
- * formats:
- *
- *
- * @param logFile
- * @return null if the passed in logFile isn't a valid WAL file path
- */
- public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
- String logDirName = logFile.getParent().getName();
- // We were passed the directory and not a file in it.
- if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
- logDirName = logFile.getName();
- }
- ServerName serverName = null;
- if (logDirName.endsWith(SPLITTING_EXT)) {
- logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
- }
- try {
- serverName = ServerName.parseServerName(logDirName);
- } catch (IllegalArgumentException ex) {
- serverName = null;
- LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
- }
- if (serverName != null && serverName.getStartcode() < 0) {
- LOG.warn("Invalid log file path=" + logFile);
- serverName = null;
- }
- return serverName;
- }
-
- public static boolean isMetaFile(Path p) {
- return isMetaFile(p.getName());
- }
-
- public static boolean isMetaFile(String p) {
- if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
- return true;
- }
- return false;
- }
-
/**
* public because of FSHLog. Should be package-private
*/
public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path,
- final boolean overwritable)
- throws IOException {
+ final boolean overwritable) throws IOException {
// Configuration already does caching for the Class lookup.
Class extends Writer> logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
- ProtobufLogWriter.class, Writer.class);
+ ProtobufLogWriter.class, Writer.class);
try {
Writer writer = logWriterClass.newInstance();
writer.init(fs, path, conf, overwritable);
@@ -380,15 +65,14 @@ public class DefaultWALProvider implements WALProvider {
}
}
- /**
- * Get prefix of the log from its name, assuming WAL name in format of
- * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()}
- * @param name Name of the WAL to parse
- * @return prefix of the log
- */
- public static String getWALPrefixFromWALName(String name) {
- int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
- return name.substring(0, endIndex);
+ @Override
+ protected FSHLog createWAL() throws IOException {
+ return new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
+ getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
+ true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}
+ @Override
+ protected void doInit(Configuration conf) throws IOException {
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index c3d4b2cd124..028c60bde87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -215,6 +215,10 @@ class DisabledWALProvider implements WALProvider {
public String toString() {
return "WAL disabled.";
}
+
+ @Override
+ public void logRollerExited() {
+ }
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 0b83528242e..051ce544e99 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -198,6 +198,13 @@ public interface WAL {
@Override
String toString();
+ /**
+ * In some WAL implementation, we will write WAL entries to new file if sync failed, which means,
+ * the fail recovery is depended on log roller. So here we tell the WAL that log roller has
+ * already been exited so the WAL cloud give up recovery.
+ */
+ void logRollerExited();
+
/**
* When outside clients need to consume persisted WALs, they rely on a provided
* Reader.
@@ -268,7 +275,5 @@ public interface WAL {
public String toString() {
return this.key + "=" + this.edit;
}
-
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 08f42aa9f09..a2761df01c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -76,7 +76,8 @@ public class WALFactory {
static enum Providers {
defaultProvider(DefaultWALProvider.class),
filesystem(DefaultWALProvider.class),
- multiwal(RegionGroupingProvider.class);
+ multiwal(RegionGroupingProvider.class),
+ asyncfs(AsyncFSWALProvider.class);
Class extends WALProvider> clazz;
Providers(Class extends WALProvider> clazz) {
@@ -350,9 +351,10 @@ public class WALFactory {
/**
* Create a writer for the WAL.
+ *