HBASE-10156 FSHLog Refactor (WAS -> Fix up the HBASE-8755 slowdown when low contention)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1561450 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2014-01-26 04:41:39 +00:00
parent 1975ce2ecb
commit 74b5a394f4
26 changed files with 1813 additions and 1052 deletions

View File

@ -600,8 +600,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
* Check if async log edits are enabled on the table.
*
* @return true if that async log flush is enabled on the table
*
* @see #setAsyncLogFlush(boolean)
* @deprecated Since 0.96 we no longer have an explicity deferred log flush/sync functionality.
* Use {@link #getDurability()}.
*/
public synchronized boolean isAsyncLogFlush() {
return getDurability() == Durability.ASYNC_WAL;

View File

@ -32,17 +32,22 @@ import java.io.IOException;
public class FailedLogCloseException extends IOException {
private static final long serialVersionUID = 1759152841462990925L;
/**
*
*/
public FailedLogCloseException() {
super();
}
/**
* @param arg0
* @param msg
*/
public FailedLogCloseException(String arg0) {
super(arg0);
public FailedLogCloseException(String msg) {
super(msg);
}
public FailedLogCloseException(final String msg, final Throwable t) {
super(msg, t);
}
public FailedLogCloseException(final Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,51 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Thrown when we fail close of the write-ahead-log file.
* Package private. Only used inside this package.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class FailedSyncBeforeLogCloseException extends FailedLogCloseException {
private static final long serialVersionUID = 1759152841462990925L;
public FailedSyncBeforeLogCloseException() {
super();
}
/**
* @param msg
*/
public FailedSyncBeforeLogCloseException(String msg) {
super(msg);
}
public FailedSyncBeforeLogCloseException(final String msg, final Throwable t) {
super(msg, t);
}
public FailedSyncBeforeLogCloseException(final Throwable t) {
super(t);
}
}

View File

@ -446,7 +446,11 @@
<dependency>
<groupId>org.cloudera.htrace</groupId>
<artifactId>htrace-core</artifactId>
</dependency>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
</dependencies>
<profiles>
<!-- Skip the tests in this module -->

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
/**
* A WAL Entry for {@link FSHLog} implementation. Immutable.
* It is a subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as
* region sequence id (we want to use this later, just before we write the WAL to ensure region
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
* hence marked 'transient' to underline this fact.
*/
@InterfaceAudience.Private
class FSWALEntry extends HLog.Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
private final transient AtomicLong regionSequenceIdReference;
private final transient boolean inMemstore;
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
final HTableDescriptor htd, final HRegionInfo hri) {
super(key, edit);
this.regionSequenceIdReference = referenceToRegionSequenceId;
this.inMemstore = inMemstore;
this.htd = htd;
this.hri = hri;
this.sequence = sequence;
}
public String toString() {
return "sequence=" + this.sequence + ", " + super.toString();
};
AtomicLong getRegionSequenceIdReference() {
return this.regionSequenceIdReference;
}
boolean isInMemstore() {
return this.inMemstore;
}
HTableDescriptor getHTableDescriptor() {
return this.htd;
}
HRegionInfo getHRegionInfo() {
return this.hri;
}
/**
* @return The sequence on the ring buffer when this edit was added.
*/
long getSequence() {
return this.sequence;
}
}

View File

@ -42,7 +42,9 @@ import org.apache.hadoop.io.Writable;
import com.google.common.annotations.VisibleForTesting;
/**
* HLog records all the edits to HStore. It is the hbase write-ahead-log (WAL).
*/
@InterfaceAudience.Private
// TODO: Rename interface to WAL
public interface HLog {
@ -52,7 +54,8 @@ public interface HLog {
// TODO: this seems like an implementation detail that does not belong here.
String SPLITTING_EXT = "-splitting";
boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
/** The hbase:meta region's HLog filename extension */
/** The hbase:meta region's HLog filename extension.*/
// TODO: Implementation detail. Does not belong in here.
String META_HLOG_FILE_EXTN = ".meta";
/**
@ -63,12 +66,14 @@ public interface HLog {
String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
// TODO: Implemenation detail. Why in here?
// TODO: Implementation detail. Why in here?
Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
/**
* WAL Reader Interface
*/
interface Reader {
/**
* @param fs File system.
* @param path Path.
@ -90,12 +95,16 @@ public interface HLog {
/**
* @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL
* files.
* files.
*/
// TODO: What we need a trailer on WAL for?
// TODO: What we need a trailer on WAL for? It won't be present on last WAL most of the time.
// What then?
WALTrailer getWALTrailer();
}
/**
* WAL Writer Intrface.
*/
interface Writer {
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
@ -108,17 +117,19 @@ public interface HLog {
long getLength() throws IOException;
/**
* Sets HLog's WALTrailer. This trailer is appended at the end of WAL on closing.
* Sets HLog/WAL's WALTrailer. This trailer is appended at the end of WAL on closing.
* @param walTrailer trailer to append to WAL.
*/
// TODO: Why a trailer on the log?
void setWALTrailer(WALTrailer walTrailer);
}
/**
* Utility class that lets us keep track of the edit with it's key.
* Only used when splitting logs.
* Utility class that lets us keep track of the edit and it's associated key. Only used when
* splitting logs.
*/
// TODO: Remove this Writable.
// TODO: Why is this in here? Implementation detail?
class Entry implements Writable {
private WALEdit edit;
private HLogKey key;
@ -135,7 +146,6 @@ public interface HLog {
* @param key log's key
*/
public Entry(HLogKey key, WALEdit edit) {
super();
this.key = key;
this.edit = edit;
}
@ -161,8 +171,7 @@ public interface HLog {
/**
* Set compression context for this entry.
*
* @param compressionContext
* Compression context
* @param compressionContext Compression context
*/
public void setCompressionContext(CompressionContext compressionContext) {
edit.setCompressionContext(compressionContext);
@ -189,14 +198,14 @@ public interface HLog {
}
/**
* registers WALActionsListener
* Registers WALActionsListener
*
* @param listener
*/
void registerWALActionsListener(final WALActionsListener listener);
/**
* unregisters WALActionsListener
* Unregisters WALActionsListener
*
* @param listener
*/
@ -250,8 +259,7 @@ public interface HLog {
* @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
* @throws IOException
*/
byte[][] rollWriter(boolean force) throws FailedLogCloseException,
IOException;
byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException;
/**
* Shut down the log.
@ -261,43 +269,68 @@ public interface HLog {
void close() throws IOException;
/**
* Shut down the log and delete the log directory
* Shut down the log and delete the log directory.
* Used by tests only and in rare cases where we need a log just temporarily while bootstrapping
* a region or running migrations.
*
* @throws IOException
*/
void closeAndDelete() throws IOException;
/**
* Same as appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor),
* Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor,
* AtomicLong, boolean, long, long)}
* except it causes a sync on the log
* @param sequenceId of the region.
* @param info
* @param tableName
* @param edits
* @param now
* @param htd
* @param sequenceId
* @throws IOException
*/
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
/**
* For notification post append to the writer.
* @param entries
* For notification post append to the writer. Used by metrics system at least.
* @param entry
* @param elapsedTime
* @return Size of this append.
*/
void postAppend(final List<Entry> entries);
long postAppend(final Entry entry, final long elapsedTime);
/**
* For notification post writer sync.
* For notification post writer sync. Used by metrics system at least.
* @param timeInMillis How long the filesystem sync took in milliseconds.
* @param handlerSyncs How many sync handler calls were released by this call to filesystem
* sync.
*/
void postSync();
void postSync(final long timeInMillis, final int handlerSyncs);
/**
* Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
* log-sequence-id. The HLog is not flushed after this transaction is written to the log.
* Append a set of edits to the WAL. WAL edits are keyed by (encoded) regionName, rowname, and
* log-sequence-id. The WAL is not flushed/sync'd after this transaction completes.
* Call {@link #sync()} to flush/sync all outstanding edits/appends.
* @param info
* @param tableName
* @param edits
* @param clusterIds The clusters that have consumed the change (for replication)
* @param clusterIds
* @param now
* @param htd
* @param sequenceId of the region
* @return txid of this transaction
* @param sequenceId A reference to the atomic long the <code>info</code> region is using as
* source of its incrementing edits sequence id. Inside in this call we will increment it and
* attach the sequence to the edit we apply the WAL.
* @param isInMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
* @param nonceGroup
* @param nonce
* @return Returns a 'transaction id'. Do not use. This is an internal implementation detail and
* cannot be respected in all implementations; i.e. the append/sync machine may or may not be
* able to sync an explicit edit only (the current default implementation syncs up to the time
* of the sync call syncing whatever is behind the sync).
* @throws IOException
*/
long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
@ -311,6 +344,14 @@ public interface HLog {
void sync() throws IOException;
/**
* @param txid Transaction id to sync to.
* @throws IOException
* @deprecated Since 0.96.2. Just call {@link #sync()}. <code>txid</code> should not be allowed
* outside the implementation.
*/
// TODO: Why is this exposed? txid is an internal detail.
@Deprecated
void sync(long txid) throws IOException;
/**
@ -318,7 +359,7 @@ public interface HLog {
* in order to be able to do cleanup. This method tells WAL that some region is about
* to flush memstore.
*
* We stash the oldest seqNum for the region, and let the the next edit inserted in this
* <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
* region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor,
* AtomicLong)} as new oldest seqnum.
* In case of flush being aborted, we put the stashed value back; in case of flush succeeding,

View File

@ -59,8 +59,8 @@ public class HLogFactory {
public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
final Configuration conf, final List<WALActionsListener> listeners,
final String prefix) throws IOException {
return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, false, prefix, true);
return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
false, prefix, true);
}
/*

View File

@ -189,6 +189,14 @@ public class HLogKey implements WritableComparable<HLogKey> {
return this.logSeqNum;
}
/**
* Allow that the log sequence id to be set post-construction.
* @param sequence
*/
void setLogSeqNum(final long sequence) {
this.logSeqNum = sequence;
}
/**
* @return the write time
*/
@ -439,17 +447,17 @@ public class HLogKey implements WritableComparable<HLogKey> {
// Do not need to read the clusters information as we are using protobufs from 0.95
}
public WALKey.Builder getBuilder(
WALCellCodec.ByteStringCompressor compressor) throws IOException {
public WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
throws IOException {
WALKey.Builder builder = WALKey.newBuilder();
if (compressionContext == null) {
builder.setEncodedRegionName(ZeroCopyLiteralByteString.wrap(this.encodedRegionName));
builder.setTableName(ZeroCopyLiteralByteString.wrap(this.tablename.getName()));
} else {
builder.setEncodedRegionName(
compressor.compress(this.encodedRegionName, compressionContext.regionDict));
builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
compressionContext.regionDict));
builder.setTableName(compressor.compress(this.tablename.getName(),
compressionContext.tableDict));
compressionContext.tableDict));
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
@ -467,7 +475,8 @@ public class HLogKey implements WritableComparable<HLogKey> {
}
if (scopes != null) {
for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
ByteString family = (compressionContext == null) ? ZeroCopyLiteralByteString.wrap(e.getKey())
ByteString family = (compressionContext == null) ?
ZeroCopyLiteralByteString.wrap(e.getKey())
: compressor.compress(e.getKey(), compressionContext.familyDict);
builder.addScopes(FamilyScope.newBuilder()
.setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));

View File

@ -189,7 +189,7 @@ public class HLogUtil {
serverName = ServerName.parseServerName(logDirName);
} catch (IllegalArgumentException ex) {
serverName = null;
LOG.warn("Invalid log file path=" + logFile, ex);
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
}
if (serverName != null && serverName.getStartcode() < 0) {
LOG.warn("Invalid log file path=" + logFile);
@ -266,9 +266,9 @@ public class HLogUtil {
WALEdit e = WALEdit.createCompaction(c);
long now = EnvironmentEdgeManager.currentTimeMillis();
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
long txid = log.appendNoSync(info, tn, e, new ArrayList<UUID>(), now, htd, sequenceId,
false, HConstants.NO_NONCE, HConstants.NO_NONCE);
log.sync(txid);
log.appendNoSync(info, tn, e, new ArrayList<UUID>(), now, htd, sequenceId, false,
HConstants.NO_NONCE, HConstants.NO_NONCE);
log.sync();
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));

View File

@ -64,7 +64,8 @@ public class ProtobufLogWriter extends WriterBase {
@Override
@SuppressWarnings("deprecation")
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
throws IOException {
super.init(fs, path, conf, overwritable);
assert this.output == null;
boolean doCompress = initializeCompressionContext(conf, path);
@ -99,8 +100,8 @@ public class ProtobufLogWriter extends WriterBase {
@Override
public void append(HLog.Entry entry) throws IOException {
entry.setCompressionContext(compressionContext);
entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size())
.build().writeDelimitedTo(output);
entry.getKey().getBuilder(compressor).
setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
for (KeyValue kv : entry.getEdit().getKeyValues()) {
// cellEncoder must assume little about the stream, since we write PB and cells in turn.
cellEncoder.write(kv);

View File

@ -0,0 +1,77 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.classification.InterfaceAudience;
import org.cloudera.htrace.Span;
import com.lmax.disruptor.EventFactory;
/**
* A 'truck' to carry a payload across the {@link FSHLog} ring buffer from Handler to WAL.
* Has EITHER a {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to
* represent a 'sync' invocation. Immutable but instances get recycled on the ringbuffer.
*/
@InterfaceAudience.Private
class RingBufferTruck {
/**
* Either this syncFuture is set or entry is set, but not both.
*/
private SyncFuture syncFuture;
private FSWALEntry entry;
/**
* The tracing span for this entry. Can be null.
* TODO: Fix up tracing.
*/
private Span span;
void loadPayload(final FSWALEntry entry, final Span span) {
this.entry = entry;
this.span = span;
this.syncFuture = null;
}
void loadPayload(final SyncFuture syncFuture) {
this.syncFuture = syncFuture;
this.entry = null;
this.span = null;
}
FSWALEntry getFSWALEntryPayload() {
return this.entry;
}
SyncFuture getSyncFuturePayload() {
return this.syncFuture;
}
Span getSpanPayload() {
return this.span;
}
/**
* Factory for making a bunch of these. Needed by the ringbuffer/disruptor.
*/
final static EventFactory<RingBufferTruck> EVENT_FACTORY = new EventFactory<RingBufferTruck>() {
public RingBufferTruck newInstance() {
return new RingBufferTruck();
}
};
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till
* the sync completes.
*
* <p>Handlers coming in call append, append, append, and then do a flush/sync of
* the edits they have appended the WAL before returning. Since sync takes a while to
* complete, we give the Handlers back this sync future to wait on until the
* actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a
* sync runner thread; when it completes, it finishes up the future, the handler get or failed
* check completes and the Handler can then progress.
* <p>
* This is just a partial implementation of Future; we just implement get and
* failure. Unimplemented methods throw {@link UnsupportedOperationException}.
* <p>
* There is not a one-to-one correlation between dfs sync invocations and
* instances of this class. A single dfs sync call may complete and mark many
* SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
* call every time a Handler asks for it.
* <p>
* SyncFutures are immutable but recycled. Call {@link #reset(long)} before use even if it
* the first time, start the sync, then park the 'hitched' thread on a call to
* {@link #get()}
*/
@InterfaceAudience.Private
class SyncFuture {
private static final long NOT_DONE = 0;
/**
* The sequence at which we were added to the ring buffer.
*/
private long ringBufferSequence;
/**
* The sequence that was set in here when we were marked done. Should be equal
* or > ringBufferSequence. Put this data member into the NOT_DONE state while this
* class is in use. But for the first position on construction, let it be -1 so we can
* immediately call {@link #reset(long)} below and it will work.
*/
private long doneSequence = -1;
/**
* If error, the associated throwable. Set when the future is 'done'.
*/
private Throwable throwable = null;
private Thread t;
/**
* Call this method to clear old usage and get it ready for new deploy. Call
* this method even if it is being used for the first time.
*
* @param sequence
* @return this
*/
synchronized SyncFuture reset(final long sequence) {
if (t != null && t != Thread.currentThread()) throw new IllegalStateException();
t = Thread.currentThread();
if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread());
this.doneSequence = NOT_DONE;
this.ringBufferSequence = sequence;
return this;
}
@Override
public synchronized String toString() {
return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
}
synchronized long getRingBufferSequence() {
return this.ringBufferSequence;
}
/**
* @param sequence Sync sequence at which this future 'completed'.
* @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
* @return True if we successfully marked this outstanding future as completed/done.
* Returns false if this future is already 'done' when this method called.
*/
synchronized boolean done(final long sequence, final Throwable t) {
if (isDone()) return false;
this.throwable = t;
if (sequence < this.ringBufferSequence) {
// Something badly wrong.
if (throwable == null) {
this.throwable = new IllegalStateException("sequence=" + sequence +
", ringBufferSequence=" + this.ringBufferSequence);
}
}
// Mark done.
this.doneSequence = sequence;
// Wake up waiting threads.
notify();
return true;
}
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
public synchronized long get() throws InterruptedException, ExecutionException {
while (!isDone()) {
wait(1000);
}
if (this.throwable != null) throw new ExecutionException(this.throwable);
return this.doneSequence;
}
public Long get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
synchronized boolean isDone() {
return this.doneSequence != NOT_DONE;
}
synchronized boolean isThrowable() {
return isDone() && getThrowable() != null;
}
synchronized Throwable getThrowable() {
return this.throwable;
}
}

View File

@ -103,6 +103,7 @@ public class WALCoprocessorHost
public boolean preWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
throws IOException {
boolean bypass = false;
if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
ObserverContext<WALCoprocessorEnvironment> ctx = null;
for (WALEnvironment env: coprocessors) {
if (env.getInstance() instanceof
@ -136,6 +137,7 @@ public class WALCoprocessorHost
*/
public void postWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
throws IOException {
if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
ObserverContext<WALCoprocessorEnvironment> ctx = null;
for (WALEnvironment env: coprocessors) {
if (env.getInstance() instanceof

View File

@ -22,17 +22,16 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -272,4 +271,3 @@ public class WALEdit implements Writable, HeapSize {
return null;
}
}

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -54,6 +56,7 @@ import org.junit.experimental.categories.Category;
*/
@Category(MediumTests.class)
public class TestHLogRecordReader {
private final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf;
private static FileSystem fs;
@ -113,6 +116,8 @@ public class TestHLogRecordReader {
@Test
public void testPartialRead() throws Exception {
HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
// This test depends on timestamp being millisecond based and the filename of the WAL also
// being millisecond based.
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
final AtomicLong sequenceId = new AtomicLong(0);
@ -121,7 +126,9 @@ public class TestHLogRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.append(info, tableName, edit, ts+1, htd, sequenceId);
LOG.info("Before 1st WAL roll " + log.getFilenum());
log.rollWriter();
LOG.info("Past 1st WAL roll " + log.getFilenum());
Thread.sleep(1);
long ts1 = System.currentTimeMillis();
@ -133,6 +140,8 @@ public class TestHLogRecordReader {
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
log.append(info, tableName, edit, ts1+2, htd, sequenceId);
log.close();
LOG.info("Closed WAL " + log.getFilenum());
HLogInputFormat input = new HLogInputFormat();
Configuration jobConf = new Configuration(conf);
@ -141,6 +150,7 @@ public class TestHLogRecordReader {
// only 1st file is considered, and only its 1st entry is used
List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
assertEquals(1, splits.size());
testSplit(splits.get(0), Bytes.toBytes("1"));

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -57,8 +59,9 @@ public class TestParallelPut {
static final Log LOG = LogFactory.getLog(TestParallelPut.class);
@Rule public TestName name = new TestName();
private static HRegion region = null;
private static HBaseTestingUtility hbtu = new HBaseTestingUtility();
private HRegion region = null;
private static HBaseTestingUtility HBTU = new HBaseTestingUtility();
private static final int THREADS100 = 100;
// Test names
static byte[] tableName;
@ -70,6 +73,13 @@ public class TestParallelPut {
static final byte [] row = Bytes.toBytes("rowA");
static final byte [] row2 = Bytes.toBytes("rowB");
@BeforeClass
public static void beforeClass() {
// Make sure enough handlers.
HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100);
}
/**
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
*/
@ -81,6 +91,7 @@ public class TestParallelPut {
@After
public void tearDown() throws Exception {
EnvironmentEdgeManagerTestHelper.reset();
if (region != null) region.close(true);
}
public String getName() {
@ -98,7 +109,7 @@ public class TestParallelPut {
@Test
public void testPut() throws IOException {
LOG.info("Starting testPut");
initHRegion(tableName, getName(), fam1);
this.region = initHRegion(tableName, getName(), fam1);
long value = 1L;
@ -106,7 +117,7 @@ public class TestParallelPut {
put.add(fam1, qual1, Bytes.toBytes(value));
region.put(put);
assertGet(row, fam1, qual1, Bytes.toBytes(value));
assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value));
}
/**
@ -116,25 +127,25 @@ public class TestParallelPut {
public void testParallelPuts() throws IOException {
LOG.info("Starting testParallelPuts");
initHRegion(tableName, getName(), fam1);
this.region = initHRegion(tableName, getName(), fam1);
int numOps = 1000; // these many operations per thread
// create 100 threads, each will do its own puts
int numThreads = 100;
Putter[] all = new Putter[numThreads];
Putter[] all = new Putter[THREADS100];
// create all threads
for (int i = 0; i < numThreads; i++) {
for (int i = 0; i < THREADS100; i++) {
all[i] = new Putter(region, i, numOps);
}
// run all threads
for (int i = 0; i < numThreads; i++) {
for (int i = 0; i < THREADS100; i++) {
all[i].start();
}
// wait for all threads to finish
for (int i = 0; i < numThreads; i++) {
for (int i = 0; i < THREADS100; i++) {
try {
all[i].join();
} catch (InterruptedException e) {
@ -143,14 +154,12 @@ public class TestParallelPut {
}
}
LOG.info("testParallelPuts successfully verified " +
(numOps * numThreads) + " put operations.");
(numOps * THREADS100) + " put operations.");
}
static private void assertGet(byte [] row,
byte [] familiy,
byte[] qualifier,
byte[] value) throws IOException {
private static void assertGet(final HRegion region, byte [] row, byte [] familiy,
byte[] qualifier, byte[] value) throws IOException {
// run a get and see if the value matches
Get get = new Get(row);
get.addColumn(familiy, qualifier);
@ -162,7 +171,7 @@ public class TestParallelPut {
assertTrue(Bytes.compareTo(r, value) == 0);
}
private void initHRegion(byte [] tableName, String callingMethod,
private HRegion initHRegion(byte [] tableName, String callingMethod,
byte[] ... families)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
@ -170,7 +179,7 @@ public class TestParallelPut {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
region = hbtu.createLocalHRegion(info, htd);
return HBTU.createLocalHRegion(info, htd);
}
/**
@ -211,7 +220,7 @@ public class TestParallelPut {
OperationStatus[] ret = region.batchMutate(in);
assertEquals(1, ret.length);
assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
assertGet(rowkey, fam1, qual1, value);
assertGet(this.region, rowkey, fam1, qual1, value);
} catch (IOException e) {
assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
false);
@ -219,6 +228,4 @@ public class TestParallelPut {
}
}
}
}

View File

@ -54,8 +54,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.ConsoleReporter;
@ -71,9 +71,16 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
private final Meter syncMeter =
metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS);
private final Histogram syncHistogram =
metrics.newHistogram(HLogPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs", true);
metrics.newHistogram(HLogPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs",
true);
private final Histogram syncCountHistogram =
metrics.newHistogram(HLogPerformanceEvaluation.class, "syncCountHistogram", "countPerSync",
true);
private final Meter appendMeter =
metrics.newMeter(HLogPerformanceEvaluation.class, "appendMeter", "bytes", TimeUnit.MILLISECONDS);
metrics.newMeter(HLogPerformanceEvaluation.class, "appendMeter", "bytes",
TimeUnit.MILLISECONDS);
private final Histogram latencyHistogram =
metrics.newHistogram(HLogPerformanceEvaluation.class, "latencyHistogram", "nanos", true);
private HBaseTestingUtility TEST_UTIL;
@ -127,8 +134,8 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
long startTime = System.currentTimeMillis();
int lastSync = 0;
for (int i = 0; i < numIterations; ++i) {
long now = System.nanoTime();
Put put = setupPut(rand, key, value, numFamilies);
long now = System.currentTimeMillis();
WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
HRegionInfo hri = region.getRegionInfo();
@ -140,6 +147,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
lastSync = 0;
}
}
latencyHistogram.update(System.nanoTime() - now);
}
long totalTime = (System.currentTimeMillis() - startTime);
logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
@ -231,6 +239,10 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
}
// Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
// In regionserver, number of handlers == number of threads.
getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
// Run HLog Performance Evaluation
// First set the fs from configs. In case we are on hadoop1
FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
@ -245,47 +257,72 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
// Initialize Table Descriptor
HTableDescriptor htd = createHTableDescriptor(numFamilies);
final long whenToRoll = roll;
HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) {
int appends = 0;
long lastSync = 0;
final HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) {
@Override
protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
HTableDescriptor htd)
throws IOException {
public void postSync(final long timeInNanos, final int handlerSyncs) {
super.postSync(timeInNanos, handlerSyncs);
syncMeter.mark();
syncHistogram.update(timeInNanos);
syncCountHistogram.update(handlerSyncs);
}
@Override
public long postAppend(final HLog.Entry entry, final long elapsedTime) {
long size = super.postAppend(entry, elapsedTime);
appendMeter.mark(size);
return size;
}
};
hlog.registerWALActionsListener(new WALActionsListener() {
private int appends = 0;
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
WALEdit logEdit) {
this.appends++;
if (this.appends % whenToRoll == 0) {
LOG.info("Rolling after " + appends + " edits");
rollWriter();
// We used to do explicit call to rollWriter but changed it to a request
// to avoid dead lock (there are less threads going on in this class than
// in the regionserver -- regionserver does not have the issue).
((FSHLog)hlog).requestLogRoll();
}
super.doWrite(info, logKey, logEdit, htd);
};
@Override
public void postSync() {
super.postSync();
syncMeter.mark();
long now = System.nanoTime();
if (lastSync > 0) {
long diff = now - lastSync;
syncHistogram.update(diff);
}
this.lastSync = now;
}
@Override
public void postAppend(List<Entry> entries) {
super.postAppend(entries);
int size = 0;
for (Entry e: entries) size += e.getEdit().heapSize();
appendMeter.mark(size);
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
}
};
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
}
@Override
public void preLogArchive(Path oldPath, Path newPath) throws IOException {
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
}
@Override
public void postLogArchive(Path oldPath, Path newPath) throws IOException {
}
@Override
public void logRollRequested() {
}
@Override
public void logCloseRequested() {
}
});
hlog.rollWriter();
HRegion region = null;
try {
region = openRegion(fs, rootRegionDir, htd, hlog);
ConsoleReporter.enable(this.metrics, 60, TimeUnit.SECONDS);
ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS);
long putTime =
runBenchmark(new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval),
numThreads);
@ -391,21 +428,27 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
System.err.println(" -nocleanup Do NOT remove test data when done.");
System.err.println(" -noclosefs Do NOT close the filesystem when done.");
System.err.println(" -nosync Append without syncing");
System.err.println(" -syncInterval <N> Append N edits and then sync. Default=0, i.e. sync every edit.");
System.err.println(" -syncInterval <N> Append N edits and then sync. " +
"Default=0, i.e. sync every edit.");
System.err.println(" -verify Verify edits written in sequence");
System.err.println(" -verbose Output extra info; e.g. all edit seq ids when verifying");
System.err.println(" -verbose Output extra info; " +
"e.g. all edit seq ids when verifying");
System.err.println(" -roll <N> Roll the way every N appends");
System.err.println(" -encryption <A> Encrypt the WAL with algorithm A, e.g. AES");
System.err.println("");
System.err.println("Examples:");
System.err.println("");
System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and verification afterward do:");
System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal.HLogPerformanceEvaluation \\");
System.err.println(" -conf ./core-site.xml -path hdfs://example.org:7000/tmp -threads 100 -roll 10000 -verify");
System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
"verification afterward do:");
System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal." +
"HLogPerformanceEvaluation \\");
System.err.println(" -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
"-threads 100 -roll 10000 -verify");
System.exit(1);
}
private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd, final HLog hlog)
private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd,
final HLog hlog)
throws IOException {
// Initialize HRegion
HRegionInfo regionInfo = new HRegionInfo(htd.getTableName());

View File

@ -85,7 +85,7 @@ public class TestDurability {
region.put(newPut(null));
verifyHLogCount(wal, 1);
// a put through the deferred table does not write to the wal immdiately,
// a put through the deferred table does not write to the wal immediately,
// but maybe has been successfully sync-ed by the underlying AsyncWriter +
// AsyncFlusher thread
deferredRegion.put(newPut(null));
@ -114,7 +114,7 @@ public class TestDurability {
wal.sync();
verifyHLogCount(wal, 6);
// async overrides sync table default
// Async overrides sync table default
region.put(newPut(Durability.ASYNC_WAL));
deferredRegion.put(newPut(Durability.ASYNC_WAL));
wal.sync();

View File

@ -85,12 +85,10 @@ public class TestHLog {
@Before
public void setUp() throws Exception {
FileStatus[] entries = fs.listStatus(new Path("/"));
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
}
}
@After
@ -127,6 +125,7 @@ public class TestHLog {
oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
dir = new Path(hbaseDir, getName());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
@ -158,13 +157,11 @@ public class TestHLog {
*/
@Test
public void testSplit() throws IOException {
final TableName tableName =
TableName.valueOf(getName());
final byte [] rowName = tableName.getName();
Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
HLog log = HLogFactory.createHLog(fs, hbaseDir,
HConstants.HREGION_LOGDIR_NAME, conf);
HLog log = HLogFactory.createHLog(fs, hbaseDir, HConstants.HREGION_LOGDIR_NAME, conf);
final int howmany = 3;
HRegionInfo[] infos = new HRegionInfo[3];
Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
@ -199,8 +196,7 @@ public class TestHLog {
log.rollWriter();
}
log.close();
List<Path> splits = HLogSplitter.split(
hbaseDir, logdir, oldLogDir, fs, conf);
List<Path> splits = HLogSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf);
verifySplits(splits, howmany);
log = null;
} finally {

View File

@ -1097,7 +1097,7 @@ public class TestHLogSplit {
}
// Send the data to HDFS datanodes and close the HDFS writer
log.sync();
((FSHLog) log).cleanupCurrentWriter(log.getFilenum());
((FSHLog) log).replaceWriter(((FSHLog)log).getOldPath(), null, null, null);
/* code taken from ProcessServerShutdown.process()
* handles RS shutdowns (as observed by the Master)

View File

@ -18,23 +18,23 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@ -53,17 +53,6 @@ public class TestLogRollAbort {
private static MiniHBaseCluster cluster;
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
// verbose logging on classes that are touched in these tests
{
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
.getLogger().setLevel(Level.ALL);
((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
}
// Need to override this setup so we can edit the config before it gets sent
// to the HDFS & HBase cluster startup.
@BeforeClass
@ -120,41 +109,45 @@ public class TestLogRollAbort {
// Create the test table and open it
String tableName = this.getClass().getSimpleName();
HTableDescriptor desc = new HTableDescriptor(tableName);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
desc.setAsyncLogFlush(true);
admin.createTable(desc);
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
try {
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
HLog log = server.getWAL();
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
HLog log = server.getWAL();
assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test",
assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test",
FSUtils.isAppendSupported(TEST_UTIL.getConfiguration()));
Put p = new Put(Bytes.toBytes("row2001"));
p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001));
table.put(p);
Put p = new Put(Bytes.toBytes("row2001"));
p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001));
table.put(p);
log.sync();
log.sync();
p = new Put(Bytes.toBytes("row2002"));
p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002));
table.put(p);
p = new Put(Bytes.toBytes("row2002"));
p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002));
table.put(p);
dfsCluster.restartDataNodes();
LOG.info("Restarted datanodes");
dfsCluster.restartDataNodes();
LOG.info("Restarted datanodes");
try {
log.rollWriter(true);
} catch (FailedLogCloseException flce) {
assertTrue("Should have deferred flush log edits outstanding",
((FSHLog) log).hasUnSyncedEntries());
try {
log.rollWriter(true);
} catch (FailedLogCloseException flce) {
// Expected exception. We used to expect that there would be unsynced appends but this
// not reliable now that sync plays a roll in wall rolling. The above puts also now call
// sync.
} catch (Throwable t) {
LOG.fatal("FAILED TEST: Got wrong exception", t);
}
} finally {
table.close();
}
}
}

View File

@ -91,18 +91,6 @@ public class TestLogRolling {
private MiniHBaseCluster cluster;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
// verbose logging on classes that are touched in these tests
{
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
.getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
}
/**
* constructor
* @throws Exception
@ -135,8 +123,7 @@ public class TestLogRolling {
// We roll the log after every 32 writes
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
@ -162,13 +149,11 @@ public class TestLogRolling {
// quickly detects datanode failures
TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
// the namenode might still try to choose the recently-dead datanode
// for a pipeline, so try to a new pipeline multiple times
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.hlog.tolerable.lowreplication", 2);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.hlog.lowreplication.rolllimit", 3);
// the namenode might still try to choose the recently-dead datanode
// for a pipeline, so try to a new pipeline multiple times
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
}
@Before
@ -220,6 +205,7 @@ public class TestLogRolling {
@Test
public void testLogRolling() throws Exception {
this.tableName = getName();
// TODO: Why does this write data take for ever?
startAndWriteData();
LOG.info("after writing there are " + ((FSHLog) log).getNumRolledLogFiles() + " log files");
@ -322,6 +308,7 @@ public class TestLogRolling {
*/
@Test
public void testLogRollOnDatanodeDeath() throws Exception {
TEST_UTIL.ensureSomeRegionServersAvailable(2);
assertTrue("This test requires HLog file replication set to 2.",
fs.getDefaultReplication() == 2);
LOG.info("Replication=" + fs.getDefaultReplication());
@ -363,7 +350,7 @@ public class TestLogRolling {
assertTrue("DataNodes " + dfsCluster.getDataNodes().size() +
" default replication " + fs.getDefaultReplication(),
dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
writeData(table, 2);

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@ -54,8 +55,9 @@ public class TestLogRollingNoCluster {
public void testContendedLogRolling() throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
Path dir = TEST_UTIL.getDataTestDir();
HLog wal = HLogFactory.createHLog(fs, dir, "logs",
TEST_UTIL.getConfiguration());
// The implementation needs to know the 'handler' count.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
HLog wal = HLogFactory.createHLog(fs, dir, "logs", TEST_UTIL.getConfiguration());
Appender [] appenders = null;
@ -122,7 +124,6 @@ public class TestLogRollingNoCluster {
WALEdit edit = new WALEdit();
byte[] bytes = Bytes.toBytes(i);
edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
this.wal.append(HRegionInfo.FIRST_META_REGIONINFO,
HTableDescriptor.META_TABLEDESC.getTableName(),
edit, now, HTableDescriptor.META_TABLEDESC, sequenceId);
@ -135,6 +136,13 @@ public class TestLogRollingNoCluster {
} catch (Exception e) {
this.e = e;
log.info("Caught exception from Appender:" + getName(), e);
} finally {
// Call sync on our log.else threads just hang out.
try {
this.wal.sync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -899,6 +899,7 @@
<commons-lang.version>2.6</commons-lang.version>
<commons-logging.version>1.1.3</commons-logging.version>
<commons-math.version>2.2</commons-math.version>
<disruptor.version>3.2.0</disruptor.version>
<collections.version>3.2.1</collections.version>
<httpclient.version>3.1</httpclient.version>
<metrics-core.version>2.1.2</metrics-core.version>
@ -1362,6 +1363,11 @@
<artifactId>htrace-core</artifactId>
<version>${htrace.version}</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Dependencies needed by subprojects -->