HBASE-1974 Update to latest on hadoop 0.21 branch (November11th, 2009)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@835241 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-11-12 05:48:56 +00:00
parent 6babfad71f
commit d89e5f13b0
11 changed files with 24 additions and 31 deletions

View File

@ -1177,8 +1177,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (writeToWAL) { if (writeToWAL) {
this.log.append(regionInfo.getRegionName(), this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), kvs, regionInfo.getTableDesc().getName(), kvs, now);
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
} }
flush = isFlushSize(size); flush = isFlushSize(size);
} finally { } finally {
@ -1451,8 +1450,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (writeToWAL) { if (writeToWAL) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
this.log.append(regionInfo.getRegionName(), this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), edits, regionInfo.getTableDesc().getName(), edits, now);
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
} }
long size = 0; long size = 0;
Store store = getStore(family); Store store = getStore(family);
@ -2363,8 +2361,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
List<KeyValue> edits = new ArrayList<KeyValue>(1); List<KeyValue> edits = new ArrayList<KeyValue>(1);
edits.add(newKv); edits.add(newKv);
this.log.append(regionInfo.getRegionName(), this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), edits, regionInfo.getTableDesc().getName(), edits, now);
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
} }
// Now request the ICV to the store, this will set the timestamp // Now request the ICV to the store, this will set the timestamp

View File

@ -649,9 +649,7 @@ public class HLog implements HConstants, Syncable {
// region being flushed is removed if the sequence number of the flush // region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten. // is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion(); doWrite(logKey, logEdit, logKey.getWriteTime());
doWrite(logKey, logEdit, sync, logKey.getWriteTime());
this.unflushedEntries.incrementAndGet(); this.unflushedEntries.incrementAndGet();
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
} }
@ -682,12 +680,11 @@ public class HLog implements HConstants, Syncable {
* @param regionName * @param regionName
* @param tableName * @param tableName
* @param edits * @param edits
* @param sync
* @param now * @param now
* @throws IOException * @throws IOException
*/ */
public void append(byte [] regionName, byte [] tableName, List<KeyValue> edits, public void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
boolean sync, final long now) final long now)
throws IOException { throws IOException {
if (this.closed) { if (this.closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
@ -702,7 +699,7 @@ public class HLog implements HConstants, Syncable {
int counter = 0; int counter = 0;
for (KeyValue kv: edits) { for (KeyValue kv: edits) {
HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now); HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now);
doWrite(logKey, kv, sync, now); doWrite(logKey, kv, now);
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
} }
@ -808,13 +805,7 @@ public class HLog implements HConstants, Syncable {
logSyncerThread.addToSyncQueue(force); logSyncerThread.addToSyncQueue(force);
} }
/** public void hflush() throws IOException {
* Multiple threads will call sync() at the same time, only the winner
* will actually flush if there is any race or build up.
*
* @throws IOException
*/
protected void hflush() throws IOException {
synchronized (this.updateLock) { synchronized (this.updateLock) {
if (this.closed) { if (this.closed) {
return; return;
@ -822,6 +813,7 @@ public class HLog implements HConstants, Syncable {
if (this.forceSync || if (this.forceSync ||
this.unflushedEntries.get() >= this.flushlogentries) { this.unflushedEntries.get() >= this.flushlogentries) {
try { try {
LOG.info("hflush remove");
this.writer.sync(); this.writer.sync();
if (this.writer_out != null) { if (this.writer_out != null) {
this.writer_out.sync(); this.writer_out.sync();
@ -837,20 +829,25 @@ public class HLog implements HConstants, Syncable {
} }
} }
public void hsync() throws IOException {
// Not yet implemented up in hdfs so just call hflush.
hflush();
}
private void requestLogRoll() { private void requestLogRoll() {
if (this.listener != null) { if (this.listener != null) {
this.listener.logRollRequested(); this.listener.logRollRequested();
} }
} }
private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync, private void doWrite(HLogKey logKey, KeyValue logEdit, final long now)
final long now)
throws IOException { throws IOException {
if (!this.enabled) { if (!this.enabled) {
return; return;
} }
try { try {
this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize()); this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
if (this.numEntries.get() % this.flushlogentries == 0) LOG.info("edit=" + this.numEntries.get() + ", write=" + logKey.toString());
this.writer.append(logKey, logEdit); this.writer.append(logKey, logEdit);
long took = System.currentTimeMillis() - now; long took = System.currentTimeMillis() - now;
if (took > 1000) { if (took > 1000) {

View File

@ -333,7 +333,7 @@ public class Merge extends Configured implements Tool {
* *
* @throws IOException * @throws IOException
*/ */
private int parseArgs(String[] args) { private int parseArgs(String[] args) throws IOException {
GenericOptionsParser parser = GenericOptionsParser parser =
new GenericOptionsParser(this.getConf(), args); new GenericOptionsParser(this.getConf(), args);

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.MiniMRCluster;
import com.sun.corba.se.pept.transport.Connection;
/** /**
* Facility for testing HBase. Added as tool to abet junit4 testing. Replaces * Facility for testing HBase. Added as tool to abet junit4 testing. Replaces

View File

@ -92,7 +92,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
System.currentTimeMillis(), column)); System.currentTimeMillis(), column));
System.out.println("Region " + i + ": " + edit); System.out.println("Region " + i + ": " + edit);
log.append(Bytes.toBytes("" + i), tableName, edit, log.append(Bytes.toBytes("" + i), tableName, edit,
false, System.currentTimeMillis()); System.currentTimeMillis());
} }
} }
log.rollWriter(); log.rollWriter();
@ -132,7 +132,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
List<KeyValue> kvs = new ArrayList<KeyValue>(); List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
wal.append(bytes, bytes, kvs, false, System.currentTimeMillis()); wal.append(bytes, bytes, kvs, System.currentTimeMillis());
} }
// Now call sync and try reading. Opening a Reader before you sync just // Now call sync and try reading. Opening a Reader before you sync just
// gives you EOFE. // gives you EOFE.
@ -150,7 +150,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
List<KeyValue> kvs = new ArrayList<KeyValue>(); List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
wal.append(bytes, bytes, kvs, false, System.currentTimeMillis()); wal.append(bytes, bytes, kvs, System.currentTimeMillis());
} }
reader = HLog.getReader(this.fs, walPath, this.conf); reader = HLog.getReader(this.fs, walPath, this.conf);
count = 0; count = 0;
@ -169,7 +169,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
List<KeyValue> kvs = new ArrayList<KeyValue>(); List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value)); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value));
wal.append(bytes, bytes, kvs, false, System.currentTimeMillis()); wal.append(bytes, bytes, kvs, System.currentTimeMillis());
} }
// Now I should have written out lots of blocks. Sync then read. // Now I should have written out lots of blocks. Sync then read.
wal.sync(); wal.sync();
@ -238,7 +238,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') })); timestamp, new byte[] { (byte)(i + '0') }));
} }
log.append(regionName, tableName, cols, false, System.currentTimeMillis()); log.append(regionName, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush(); long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId); log.completeCacheFlush(regionName, tableName, logSeqId);
log.close(); log.close();