diff --git a/CHANGES.txt b/CHANGES.txt index ee2469b2cc3..0e8905fc414 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,8 @@ Release 0.93.0 - Unreleased HBASE-4460 Support running an embedded ThriftServer within a RegionServer (jgray) HBASE-4536 Allow CF to retain deleted rows (Lars H) HBASE-4629 Enable automated patch testing for hbase (Giridharan Kesavan) + HBASE-4528 The put operation can release the rowlock before sync-ing the + Hlog (dhruba via jgray) IMPROVEMENT HBASE-4132 Extend the WALActionsListener API to accomodate log archival diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c9d65a2c07c..e2356873aaa 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1209,6 +1209,7 @@ public class HRegion implements HeapSize { // , Writable{ // during the flush long sequenceId = -1L; long completeSequenceId = -1L; + ReadWriteConsistencyControl.WriteEntry w = null; // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic @@ -1219,6 +1220,10 @@ public class HRegion implements HeapSize { // , Writable{ final long currentMemStoreSize = this.memstoreSize.get(); List storeFlushers = new ArrayList(stores.size()); try { + // Record the rwcc for all transactions in progress. + w = rwcc.beginMemstoreInsert(); + rwcc.advanceMemstore(w); + sequenceId = (wal == null)? myseqid : wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); @@ -1234,8 +1239,17 @@ public class HRegion implements HeapSize { // , Writable{ } finally { this.updatesLock.writeLock().unlock(); } - status.setStatus("Flushing stores"); + status.setStatus("Waiting for rwcc"); + LOG.debug("Finished snapshotting, commencing waiting for rwcc"); + // wait for all in-progress transactions to commit to HLog before + // we can start the flush. This prevents + // uncommitted transactions from being written into HFiles. + // We have to block before we start the flush, otherwise keys that + // were removed via a rollbackMemstore could be written to Hfiles. + rwcc.waitForRead(w); + + status.setStatus("Flushing stores"); LOG.debug("Finished snapshotting, commencing flushing stores"); // Any failure from here on out will be catastrophic requiring server @@ -1246,15 +1260,17 @@ public class HRegion implements HeapSize { // , Writable{ try { // A. Flush memstore to all the HStores. // Keep running vector of all store files that includes both old and the - // just-made new flush store file. + // just-made new flush store file. The new flushed file is still in the + // tmp directory. for (StoreFlusher flusher : storeFlushers) { flusher.flushCache(status); } + // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). for (StoreFlusher flusher : storeFlushers) { - boolean needsCompaction = flusher.commit(); + boolean needsCompaction = flusher.commit(status); if (needsCompaction) { compactionRequested = true; } @@ -1483,11 +1499,12 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * This is used only by unit tests. Not required to be a public API. * @param familyMap map of family to edits for the given family. * @param writeToWAL * @throws IOException */ - public void delete(Map> familyMap, UUID clusterId, + void delete(Map> familyMap, UUID clusterId, boolean writeToWAL) throws IOException { Delete delete = new Delete(); delete.setFamilyMap(familyMap); @@ -1577,7 +1594,7 @@ public class HRegion implements HeapSize { // , Writable{ } // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); if (coprocessorHost != null) { @@ -1745,8 +1762,9 @@ public class HRegion implements HeapSize { // , Writable{ } } - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] byteNow = Bytes.toBytes(now); + ReadWriteConsistencyControl.WriteEntry w = null; + long txid = 0; + boolean walSyncSuccessful = false; boolean locked = false; /** Keep track of the locks we hold so we can release them in finally clause */ @@ -1805,6 +1823,12 @@ public class HRegion implements HeapSize { // , Writable{ lastIndexExclusive++; numReadyToWrite++; } + + // we should record the timestamp only after we have acquired the rowLock, + // otherwise, newer puts are not guaranteed to have a newer timestamp + long now = EnvironmentEdgeManager.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); + // Nothing to put -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; @@ -1823,32 +1847,23 @@ public class HRegion implements HeapSize { // , Writable{ byteNow); } - this.updatesLock.readLock().lock(); locked = true; + // // ------------------------------------ - // STEP 3. Write to WAL + // Acquire the latest rwcc number // ---------------------------------- - for (int i = firstIndex; i < lastIndexExclusive; i++) { - // Skip puts that were determined to be invalid during preprocessing - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; - } - - Put p = batchOp.operations[i].getFirst(); - if (!p.getWriteToWAL()) continue; - addFamilyMapToWALEdit(familyMaps[i], walEdit); - } - - // Append the edit to WAL - Put first = batchOp.operations[firstIndex].getFirst(); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + w = rwcc.beginMemstoreInsert(); // ------------------------------------ - // STEP 4. Write back to memstore + // STEP 3. Write back to memstore + // Write to memstore. It is ok to write to memstore + // first without updating the HLog because we do not roll + // forward the memstore RWCC. The RWCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the RWCC. The RWCC is + // moved only when the sync is complete. // ---------------------------------- long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -1856,13 +1871,65 @@ public class HRegion implements HeapSize { // , Writable{ != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i]); - batchOp.retCodeDetails[i] = new OperationStatus( - OperationStatusCode.SUCCESS); + addedSize += applyFamilyMapToMemstore(familyMaps[i], w); } // ------------------------------------ - // STEP 5. Run coprocessor post hooks + // STEP 4. Build WAL edit + // ---------------------------------- + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // Skip puts that were determined to be invalid during preprocessing + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS); + + Put p = batchOp.operations[i].getFirst(); + if (!p.getWriteToWAL()) continue; + addFamilyMapToWALEdit(familyMaps[i], walEdit); + } + + // ------------------------- + // STEP 5. Append the edit to WAL. Do not sync wal. + // ------------------------- + Put first = batchOp.operations[firstIndex].getFirst(); + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), + walEdit, first.getClusterId(), now, this.htableDescriptor); + + // ------------------------------- + // STEP 6. Release row locks, etc. + // ------------------------------- + if (locked) { + this.updatesLock.readLock().unlock(); + locked = false; + } + if (acquiredLocks != null) { + for (Integer toRelease : acquiredLocks) { + releaseRowLock(toRelease); + } + acquiredLocks = null; + } + // ------------------------- + // STEP 7. Sync wal. + // ------------------------- + if (walEdit.size() > 0 && + (this.regionInfo.isMetaRegion() || + !this.htableDescriptor.isDeferredLogFlush())) { + this.log.sync(txid); + } + walSyncSuccessful = true; + // ------------------------------------------------------------------ + // STEP 8. Advance rwcc. This will make this put visible to scanners and getters. + // ------------------------------------------------------------------ + if (w != null) { + rwcc.completeMemstoreInsert(w); + w = null; + } + + // ------------------------------------ + // STEP 9. Run coprocessor post hooks. This should be done after the wal is + // sycned so that the coprocessor contract is adhered to. // ------------------------------------ if (coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -1879,11 +1946,21 @@ public class HRegion implements HeapSize { // , Writable{ success = true; return addedSize; } finally { - if (locked) - this.updatesLock.readLock().unlock(); - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); + // if the wal sync was unsuccessful, remove keys from memstore + if (!walSyncSuccessful) { + rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); + } + if (w != null) rwcc.completeMemstoreInsert(w); + + if (locked) { + this.updatesLock.readLock().unlock(); + } + + if (acquiredLocks != null) { + for (Integer toRelease : acquiredLocks) { + releaseRowLock(toRelease); + } } if (!success) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2121,7 +2198,7 @@ public class HRegion implements HeapSize { // , Writable{ walEdit, clusterId, now, this.htableDescriptor); } - long addedSize = applyFamilyMapToMemstore(familyMap); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); @@ -2143,14 +2220,22 @@ public class HRegion implements HeapSize { // , Writable{ * should already have locked updatesLock.readLock(). This also does * not check the families for validity. * + * @param familyMap Map of kvs per family + * @param localizedWriteEntry The WriteEntry of the RWCC for this transaction. + * If null, then this method internally creates a rwcc transaction. * @return the additional memory usage of the memstore caused by the * new entries. */ - private long applyFamilyMapToMemstore(Map> familyMap) { - ReadWriteConsistencyControl.WriteEntry w = null; + private long applyFamilyMapToMemstore(Map> familyMap, + ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) { long size = 0; + boolean freerwcc = false; + try { - w = rwcc.beginMemstoreInsert(); + if (localizedWriteEntry == null) { + localizedWriteEntry = rwcc.beginMemstoreInsert(); + freerwcc = true; + } for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -2158,16 +2243,54 @@ public class HRegion implements HeapSize { // , Writable{ Store store = getStore(family); for (KeyValue kv: edits) { - kv.setMemstoreTS(w.getWriteNumber()); + kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); size += store.add(kv); } } } finally { - rwcc.completeMemstoreInsert(w); + if (freerwcc) { + rwcc.completeMemstoreInsert(localizedWriteEntry); + } } return size; } + /** + * Remove all the keys listed in the map from the memstore. This method is + * called when a Put has updated memstore but subequently fails to update + * the wal. This method is then invoked to rollback the memstore. + */ + private void rollbackMemstore(BatchOperationInProgress> batchOp, + Map>[] familyMaps, + int start, int end) { + int kvsRolledback = 0; + for (int i = start; i < end; i++) { + // skip over request that never succeeded in the first place. + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.SUCCESS) { + continue; + } + + // Rollback all the kvs for this row. + Map> familyMap = familyMaps[i]; + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List edits = e.getValue(); + + // Remove those keys from the memstore that matches our + // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is + // that even the memstoreTS has to match for keys that will be rolleded-back. + Store store = getStore(family); + for (KeyValue kv: edits) { + store.rollback(kv); + kvsRolledback++; + } + } + } + LOG.debug("rollbackMemstore rolled back " + kvsRolledback + + " keyvalues from start:" + start + " to end:" + end); + } + /** * Check the collection of families for validity. * @throws NoSuchColumnFamilyException if a family does not exist. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java b/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java index 0c1bd3344e9..51df1ee2a03 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java @@ -165,6 +165,10 @@ class KeyValueSkipListSet implements NavigableSet { throw new UnsupportedOperationException("Not implemented"); } + public KeyValue get(KeyValue kv) { + return this.delegatee.get(kv); + } + public int size() { return this.delegatee.size(); } @@ -176,4 +180,4 @@ class KeyValueSkipListSet implements NavigableSet { public T[] toArray(T[] a) { throw new UnsupportedOperationException("Not implemented"); } -} \ No newline at end of file +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 34263e46223..747a90b783d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -249,6 +249,38 @@ public class MemStore implements HeapSize { return newKv; } + /** + * Remove n key from the memstore. Only kvs that have the same key and the + * same memstoreTS are removed. It is ok to not update timeRangeTracker + * in this call. It is possible that we can optimize this method by using + * tailMap/iterator, but since this method is called rarely (only for + * error recovery), we can leave those optimization for the future. + * @param kv + */ + void rollback(final KeyValue kv) { + this.lock.readLock().lock(); + try { + // If the key is in the snapshot, delete it. We should not update + // this.size, because that tracks the size of only the memstore and + // not the snapshot. The flush of this snapshot to disk has not + // yet started because Store.flush() waits for all rwcc transactions to + // commit before starting the flush to disk. + KeyValue found = this.snapshot.get(kv); + if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) { + this.snapshot.remove(kv); + } + // If the key is in the memstore, delete it. Update this.size. + found = this.kvset.get(kv); + if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) { + this.kvset.remove(kv); + long s = heapSizeChange(kv, true); + this.size.addAndGet(-s); + } + } finally { + this.lock.readLock().unlock(); + } + } + /** * Write a delete * @param delete diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index 8ec53d31d2a..e68d9866f31 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -87,6 +87,11 @@ public class ReadWriteConsistencyControl { } public void completeMemstoreInsert(WriteEntry e) { + advanceMemstore(e); + waitForRead(e); + } + + boolean advanceMemstore(WriteEntry e) { synchronized (writeQueue) { e.markCompleted(); @@ -120,10 +125,19 @@ public class ReadWriteConsistencyControl { memstoreRead = nextReadValue; readWaiters.notifyAll(); } - } + if (memstoreRead >= e.getWriteNumber()) { + return true; + } + return false; } + } + /** + * Wait for the global readPoint to advance upto + * the specified transaction number. + */ + public void waitForRead(WriteEntry e) { boolean interrupted = false; synchronized (readWaiters) { while (memstoreRead < e.getWriteNumber()) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 44e9e5a9dff..d0b630ea021 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -321,6 +322,22 @@ public class Store implements HeapSize { } } + /** + * Removes a kv from the memstore. The KeyValue is removed only + * if its key & memstoreTS matches the key & memstoreTS value of the + * kv parameter. + * + * @param kv + */ + protected void rollback(final KeyValue kv) { + lock.readLock().lock(); + try { + this.memstore.rollback(kv); + } finally { + lock.readLock().unlock(); + } + } + /** * @return All store files. */ @@ -447,34 +464,41 @@ public class Store implements HeapSize { * @param logCacheFlushId flush sequence number * @param snapshot * @param snapshotTimeRangeTracker - * @return true if a compaction is needed + * @param flushedSize The number of bytes flushed + * @param status + * @return Path The path name of the tmp file to which the store was flushed * @throws IOException */ - private StoreFile flushCache(final long logCacheFlushId, + private Path flushCache(final long logCacheFlushId, SortedSet snapshot, TimeRangeTracker snapshotTimeRangeTracker, + AtomicLong flushedSize, MonitoredTask status) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. return internalFlushCache( - snapshot, logCacheFlushId, snapshotTimeRangeTracker, status); + snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status); } /* * @param cache * @param logCacheFlushId - * @return StoreFile created. + * @param snapshotTimeRangeTracker + * @param flushedSize The number of bytes flushed + * @return Path The path name of the tmp file to which the store was flushed * @throws IOException */ - private StoreFile internalFlushCache(final SortedSet set, + private Path internalFlushCache(final SortedSet set, final long logCacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, + AtomicLong flushedSize, MonitoredTask status) throws IOException { StoreFile.Writer writer; String fileName; long flushed = 0; + Path pathName; // Don't flush if there are no entries. if (set.size() == 0) { return null; @@ -496,7 +520,7 @@ public class Store implements HeapSize { // A. Write the map out to the disk writer = createWriterInTmp(set.size()); writer.setTimeRangeTracker(snapshotTimeRangeTracker); - fileName = writer.getPath().getName(); + pathName = writer.getPath(); try { List kvs = new ArrayList(); boolean hasMore; @@ -520,17 +544,39 @@ public class Store implements HeapSize { } } } finally { + flushedSize.set(flushed); scanner.close(); } + if (LOG.isInfoEnabled()) { + LOG.info("Flushed " + + ", sequenceid=" + logCacheFlushId + + ", memsize=" + StringUtils.humanReadableInt(flushed) + + ", into tmp file " + pathName); + } + return pathName; + } + /* + * @param path The pathname of the tmp file into which the store was flushed + * @param logCacheFlushId + * @return StoreFile created. + * @throws IOException + */ + private StoreFile commitFile(final Path path, + final long logCacheFlushId, + TimeRangeTracker snapshotTimeRangeTracker, + AtomicLong flushedSize, + MonitoredTask status) + throws IOException { // Write-out finished successfully, move into the right spot + String fileName = path.getName(); Path dstPath = new Path(homedir, fileName); - validateStoreFile(writer.getPath()); - String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath; + validateStoreFile(path); + String msg = "Renaming flushed file at " + path + " to " + dstPath; LOG.info(msg); status.setStatus("Flushing " + this + ": " + msg); - if (!fs.rename(writer.getPath(), dstPath)) { - LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath); + if (!fs.rename(path, dstPath)) { + LOG.warn("Unable to rename " + path + " to " + dstPath); } status.setStatus("Flushing " + this + ": reopening flushed file"); @@ -546,11 +592,10 @@ public class Store implements HeapSize { // HRegion.internalFlushcache, which indirectly calls this to actually do // the flushing through the StoreFlusherImpl class HRegion.incrNumericPersistentMetric("cf." + this.toString() + ".flushSize", - flushed); + flushedSize.longValue()); if(LOG.isInfoEnabled()) { LOG.info("Added " + sf + ", entries=" + r.getEntries() + ", sequenceid=" + logCacheFlushId + - ", memsize=" + StringUtils.humanReadableInt(flushed) + ", filesize=" + StringUtils.humanReadableInt(r.length())); } return sf; @@ -1815,10 +1860,13 @@ public class Store implements HeapSize { private long cacheFlushId; private SortedSet snapshot; private StoreFile storeFile; + private Path storeFilePath; private TimeRangeTracker snapshotTimeRangeTracker; + private AtomicLong flushedSize; private StoreFlusherImpl(long cacheFlushId) { this.cacheFlushId = cacheFlushId; + this.flushedSize = new AtomicLong(); } @Override @@ -1830,15 +1878,17 @@ public class Store implements HeapSize { @Override public void flushCache(MonitoredTask status) throws IOException { - storeFile = Store.this.flushCache( - cacheFlushId, snapshot, snapshotTimeRangeTracker, status); + storeFilePath = Store.this.flushCache( + cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status); } @Override - public boolean commit() throws IOException { - if (storeFile == null) { + public boolean commit(MonitoredTask status) throws IOException { + if (storeFilePath == null) { return false; } + storeFile = Store.this.commitFile(storeFilePath, cacheFlushId, + snapshotTimeRangeTracker, flushedSize, status); // Add new file to store files. Clear snapshot too while we have // the Store write lock. return Store.this.updateStorefiles(storeFile, snapshot); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index c84ba05dfb7..d2eb697bc8a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -60,5 +60,5 @@ interface StoreFlusher { * @return * @throws IOException */ - boolean commit() throws IOException; + boolean commit(MonitoredTask status) throws IOException; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index c545e54c3a9..cd6f86429f0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1230,7 +1230,7 @@ public class HLog implements Syncable { logSyncerThread.hlogFlush(this.writer); this.writer.sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = doneUpto; + this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { syncSuccessful = false; } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java new file mode 100644 index 00000000000..efa4ffe9b9d --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.NullComparator; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; + +import com.google.common.collect.Lists; + + +/** + * Testing of multiPut in parallel. + * + */ +public class TestParallelPut extends HBaseTestCase { + static final Log LOG = LogFactory.getLog(TestParallelPut.class); + + private static HRegion region = null; + private static HBaseTestingUtility hbtu = new HBaseTestingUtility(); + private static final String DIR = hbtu.getDataTestDir() + "/TestParallelPut/"; + + // Test names + static final byte[] tableName = Bytes.toBytes("testtable");; + static final byte[] qual1 = Bytes.toBytes("qual1"); + static final byte[] qual2 = Bytes.toBytes("qual2"); + static final byte[] qual3 = Bytes.toBytes("qual3"); + static final byte[] value1 = Bytes.toBytes("value1"); + static final byte[] value2 = Bytes.toBytes("value2"); + static final byte [] row = Bytes.toBytes("rowA"); + static final byte [] row2 = Bytes.toBytes("rowB"); + + /** + * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + EnvironmentEdgeManagerTestHelper.reset(); + } + + ////////////////////////////////////////////////////////////////////////////// + // New tests that don't spin up a mini cluster but rather just test the + // individual code pieces in the HRegion. + ////////////////////////////////////////////////////////////////////////////// + + /** + * Test one put command. + */ + public void testPut() throws IOException { + LOG.info("Starting testPut"); + initHRegion(tableName, getName(), fam1); + + long value = 1L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + assertGet(row, fam1, qual1, Bytes.toBytes(value)); + } + + /** + * Test multi-threaded Puts. + */ + public void testParallelPuts() throws IOException { + + LOG.info("Starting testParallelPuts"); + initHRegion(tableName, getName(), fam1); + int numOps = 1000; // these many operations per thread + + // create 100 threads, each will do its own puts + int numThreads = 100; + Putter[] all = new Putter[numThreads]; + + // create all threads + for (int i = 0; i < numThreads; i++) { + all[i] = new Putter(region, i, numOps); + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + LOG.warn("testParallelPuts encountered InterruptedException." + + " Ignoring....", e); + } + } + LOG.info("testParallelPuts successfully verified " + + (numOps * numThreads) + " put operations."); + } + + + static private void assertGet(byte [] row, + byte [] familiy, + byte[] qualifier, + byte[] value) throws IOException { + // run a get and see if the value matches + Get get = new Get(row); + get.addColumn(familiy, qualifier); + Result result = region.get(get, null); + assertEquals(1, result.size()); + + KeyValue kv = result.raw()[0]; + byte[] r = kv.getValue(); + assertTrue(Bytes.compareTo(r, value) == 0); + } + + private void initHRegion(byte [] tableName, String callingMethod, + byte[] ... families) + throws IOException { + initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); + } + + private void initHRegion(byte [] tableName, String callingMethod, + Configuration conf, byte [] ... families) + throws IOException{ + HTableDescriptor htd = new HTableDescriptor(tableName); + for(byte [] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + Path path = new Path(DIR + callingMethod); + if (fs.exists(path)) { + if (!fs.delete(path, true)) { + throw new IOException("Failed delete of " + path); + } + } + region = HRegion.createHRegion(info, path, conf, htd); + } + + /** + * A thread that makes a few put calls + */ + public static class Putter extends Thread { + + private final HRegion region; + private final int threadNumber; + private final int numOps; + private final Random rand = new Random(); + byte [] rowkey = null; + + public Putter(HRegion region, int threadNumber, int numOps) { + this.region = region; + this.threadNumber = threadNumber; + this.numOps = numOps; + this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread + setDaemon(true); + } + + @Override + public void run() { + byte[] value = new byte[100]; + Put[] in = new Put[1]; + + // iterate for the specified number of operations + for (int i=0; i oldFilenum && newFilenum > curTime); + validateData(table, 1003); writeData(table, 1004); @@ -477,6 +491,7 @@ public class TestLogRolling { Thread.sleep(1000); dfsCluster.waitActive(); LOG.info("Data Nodes restarted"); + validateData(table, 1004); // this write should succeed, but trigger a log roll writeData(table, 1005);