From 3e351091b6695c6344ad382ee28db8af0627e2e5 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 2 Feb 2008 00:36:53 +0000 Subject: [PATCH] HADOOP-2731 Under load, regions become extremely large and eventually cause region servers to become unresponsive git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk/src/contrib/hbase@617720 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + conf/hbase-default.xml | 6 +- src/java/org/apache/hadoop/hbase/HLog.java | 5 +- src/java/org/apache/hadoop/hbase/HRegion.java | 32 +-- .../apache/hadoop/hbase/HRegionServer.java | 189 ++++++------------ src/java/org/apache/hadoop/hbase/HStore.java | 19 +- .../org/apache/hadoop/hbase/HStoreKey.java | 4 +- .../apache/hadoop/hbase/MultiRegionTable.java | 124 ++++++------ .../apache/hadoop/hbase/TestCompaction.java | 1 + .../hadoop/hbase/mapred/TestTableIndex.java | 10 +- 10 files changed, 168 insertions(+), 224 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2607c231b78..5ddb6a6b24d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,8 @@ Trunk (unreleased changes) OPTIMIZATIONS BUG FIXES + HADOOP-2731 Under load, regions become extremely large and eventually cause + region servers to become unresponsive IMPROVEMENTS HADOOP-2555 Refactor the HTable#get and HTable#getRow methods to avoid diff --git a/conf/hbase-default.xml b/conf/hbase-default.xml index b7b32269070..ecb273a2304 100644 --- a/conf/hbase-default.xml +++ b/conf/hbase-default.xml @@ -164,7 +164,7 @@ hbase.hregion.memcache.block.multiplier - 2 + 1 Block updates if memcache has hbase.hregion.block.memcache time hbase.hregion.flush.size bytes. Useful preventing @@ -179,7 +179,7 @@ 268435456 Maximum HStoreFile size. If any one of a column families' HStoreFiles has - grown to exceed value + (value / 2), the hosting HRegion is split in two. + grown to exceed this value, the hosting HRegion is split in two. Default: 256M. @@ -200,7 +200,7 @@ hbase.regionserver.thread.splitcompactcheckfrequency - 15000 + 20000 How often a region server runs the split/compaction check. diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java index c6937c54092..f63b438a849 100644 --- a/src/java/org/apache/hadoop/hbase/HLog.java +++ b/src/java/org/apache/hadoop/hbase/HLog.java @@ -262,9 +262,12 @@ public class HLog implements HConstants { break; } } - LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + + if (LOG.isDebugEnabled() && sequenceNumbers.size() > 0) { + LOG.debug("Found " + sequenceNumbers.size() + + " logs to remove " + "using oldest outstanding seqnum of " + oldestOutstandingSeqNum + " from region " + oldestRegion); + } } if (sequenceNumbers.size() > 0) { for (Long seq : sequenceNumbers) { diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index c2b0ef0debc..101f5dbe254 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -218,8 +218,6 @@ public class HRegion implements HConstants { volatile boolean compacting = false; // Gets set in close. If set, cannot compact or flush again. volatile boolean writesEnabled = true; - // Gets set in close to prevent new compaction starting - volatile boolean disableCompactions = false; } volatile WriteState writestate = new WriteState(); @@ -319,7 +317,7 @@ public class HRegion implements HConstants { 1024*1024*64); this.flushListener = listener; this.blockingMemcacheSize = this.memcacheFlushSize * - conf.getInt("hbase.hregion.memcache.block.multiplier", 2); + conf.getInt("hbase.hregion.memcache.block.multiplier", 1); // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. this.desiredMaxFileSize = @@ -390,11 +388,11 @@ public class HRegion implements HConstants { } synchronized (splitLock) { synchronized (writestate) { - // Can be a compaction running and it can take a long time to - // complete -- minutes. Meantime, we want flushes to keep happening - // if we are taking on lots of updates. But we don't want another - // compaction to start so set disableCompactions flag. - this.writestate.disableCompactions = true; + // Disable compacting and flushing by background threads for this + // region. + writestate.writesEnabled = false; + LOG.debug("compactions and cache flushes disabled for region " + + regionName); while (writestate.compacting || writestate.flushing) { LOG.debug("waiting for" + (writestate.compacting ? " compaction" : "") + @@ -409,11 +407,6 @@ public class HRegion implements HConstants { // continue } } - // Disable compacting and flushing by background threads for this - // region. - writestate.writesEnabled = false; - LOG.debug("compactions and cache flushes disabled for region " + - regionName); } lock.writeLock().lock(); LOG.debug("new updates and scanners for region " + regionName + @@ -671,8 +664,7 @@ public class HRegion implements HConstants { (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) { return false; } - long triggerSize = this.desiredMaxFileSize + (this.desiredMaxFileSize / 2); - boolean split = (biggest.getAggregate() >= triggerSize); + boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize); if (split) { if (!biggest.isSplitable()) { LOG.warn("Region " + getRegionName().toString() + @@ -710,10 +702,6 @@ public class HRegion implements HConstants { } } if (!needsCompaction) { - if (LOG.isDebugEnabled()) { - LOG.debug("region " + regionInfo.getRegionName() + - " does not need compaction"); - } return false; } return compactStores(); @@ -768,15 +756,13 @@ public class HRegion implements HConstants { } try { synchronized (writestate) { - if (!writestate.compacting && writestate.writesEnabled && - !this.writestate.disableCompactions) { + if (!writestate.compacting && writestate.writesEnabled) { writestate.compacting = true; } else { LOG.info("NOT compacting region " + this.regionInfo.getRegionName().toString() + ": compacting=" + writestate.compacting + ", writesEnabled=" + - writestate.writesEnabled + ", writestate.disableCompactions=" + - this.writestate.disableCompactions); + writestate.writesEnabled); return false; } } diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 1f1c886dc9e..eb6a6082be4 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -217,77 +217,49 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.expirationTime = expirationTime; } } - - // Check to see if regions should be split - final Splitter splitter; - // Needed at shutdown. On way out, if can get this lock then we are not in - // middle of a split or compaction: i.e. splits/compactions cannot be - // interrupted. - final Integer splitterLock = new Integer(0); - - /** Split regions on request */ - class Splitter extends Thread implements RegionUnavailableListener { - private final BlockingQueue splitQueue = - new LinkedBlockingQueue(); + // Compactions + final CompactSplitThread compactSplitThread; + // Needed during shutdown so we send an interrupt after completion of a + // compaction, not in the midst. + final Integer compactSplitLock = new Integer(0); + + /** Compact region on request and then run split if appropriate + */ + private class CompactSplitThread extends Thread + implements RegionUnavailableListener { private HTable root = null; private HTable meta = null; private long startTime; + private final long frequency; + + private final BlockingQueue compactionQueue = + new LinkedBlockingQueue(); /** constructor */ - public Splitter() { + public CompactSplitThread() { super(); - } - - /** {@inheritDoc} */ - public void closing(final Text regionName) { - startTime = System.currentTimeMillis(); - lock.writeLock().lock(); - try { - // Remove region from regions Map and add it to the Map of retiring - // regions. - retiringRegions.put(regionName, onlineRegions.remove(regionName)); - if (LOG.isDebugEnabled()) { - LOG.debug(regionName.toString() + " closing (" + - "Adding to retiringRegions)"); - } - } finally { - lock.writeLock().unlock(); - } + this.frequency = + conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", + 20 * 1000); } /** {@inheritDoc} */ - public void closed(final Text regionName) { - lock.writeLock().lock(); - try { - retiringRegions.remove(regionName); - if (LOG.isDebugEnabled()) { - LOG.debug(regionName.toString() + " closed"); - } - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Perform region splits if necessary - */ @Override public void run() { while (!stopRequested.get()) { QueueEntry e = null; try { - e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); if (e == null) { continue; } - synchronized (splitterLock) { // Don't interrupt us while we're working - split(e.getRegion()); - } + e.getRegion().compactIfNeeded(); + split(e.getRegion()); } catch (InterruptedException ex) { continue; } catch (IOException ex) { - LOG.error("Split failed" + + LOG.error("Compaction failed" + (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!checkFileSystem()) { @@ -295,7 +267,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } } catch (Exception ex) { - LOG.error("Split failed" + + LOG.error("Compaction failed" + (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), ex); if (!checkFileSystem()) { @@ -307,18 +279,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } /** - * @param e entry indicating which region needs to be split + * @param e QueueEntry for region to be compacted */ - public void splitRequested(QueueEntry e) { - splitQueue.add(e); + public void compactionRequested(QueueEntry e) { + compactionQueue.add(e); + } + + void compactionRequested(final HRegion r) { + compactionRequested(new QueueEntry(r, System.currentTimeMillis())); } private void split(final HRegion region) throws IOException { final HRegionInfo oldRegionInfo = region.getRegionInfo(); final HRegion[] newRegions = region.splitRegion(this); - if (newRegions == null) { - return; // Didn't need to be split + // Didn't need to be split + return; } // When a region is split, the META table needs to updated if we're @@ -374,65 +350,35 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Do not serve the new regions. Let the Master assign them. } - } - - // Compactions - final Compactor compactor; - // Needed during shutdown so we send an interrupt after completion of a - // compaction, not in the midst. - final Integer compactionLock = new Integer(0); - - /** Compact region on request */ - class Compactor extends Thread { - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); - - /** constructor */ - public Compactor() { - super(); + + /** {@inheritDoc} */ + public void closing(final Text regionName) { + startTime = System.currentTimeMillis(); + lock.writeLock().lock(); + try { + // Remove region from regions Map and add it to the Map of retiring + // regions. + retiringRegions.put(regionName, onlineRegions.remove(regionName)); + if (LOG.isDebugEnabled()) { + LOG.debug(regionName.toString() + " closing (" + + "Adding to retiringRegions)"); + } + } finally { + lock.writeLock().unlock(); + } } /** {@inheritDoc} */ - @Override - public void run() { - while (!stopRequested.get()) { - QueueEntry e = null; - try { - e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; - } - if (e.getRegion().compactIfNeeded()) { - splitter.splitRequested(e); - } - - } catch (InterruptedException ex) { - continue; - } catch (IOException ex) { - LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - RemoteExceptionHandler.checkIOException(ex)); - if (!checkFileSystem()) { - break; - } - - } catch (Exception ex) { - LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - ex); - if (!checkFileSystem()) { - break; - } + public void closed(final Text regionName) { + lock.writeLock().lock(); + try { + retiringRegions.remove(regionName); + if (LOG.isDebugEnabled()) { + LOG.debug(regionName.toString() + " closed"); } + } finally { + lock.writeLock().unlock(); } - LOG.info(getName() + " exiting"); - } - - /** - * @param e QueueEntry for region to be compacted - */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); } } @@ -469,7 +415,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } synchronized(cacheFlusherLock) { // Don't interrupt while we're working if (e.getRegion().flushcache()) { - compactor.compactionRequested(e); + compactSplitThread.compactionRequested(e); } e.setExpirationTime(System.currentTimeMillis() + @@ -650,10 +596,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.cacheFlusher = new Flusher(); // Compaction thread - this.compactor = new Compactor(); - - // Region split thread - this.splitter = new Splitter(); + this.compactSplitThread = new CompactSplitThread(); // Log rolling thread this.logRoller = new LogRoller(); @@ -846,11 +789,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { synchronized(cacheFlusherLock) { this.cacheFlusher.interrupt(); } - synchronized (compactionLock) { - this.compactor.interrupt(); - } - synchronized (splitterLock) { - this.splitter.interrupt(); + synchronized (compactSplitLock) { + this.compactSplitThread.interrupt(); } synchronized (logRollerLock) { this.logRoller.interrupt(); @@ -972,9 +912,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { handler); Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", handler); - Threads.setDaemonThreadRunning(this.compactor, n + ".compactor", + Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", handler); - Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler); Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1038,8 +977,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { void join() { join(this.workerThread); join(this.cacheFlusher); - join(this.compactor); - join(this.splitter); + join(this.compactSplitThread); join(this.logRoller); } @@ -1219,7 +1157,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { ), this.log, this.fs, conf, regionInfo, null, this.cacheFlusher ); - + // Startup a compaction early if one is needed. + this.compactSplitThread.compactionRequested(region); } catch (IOException e) { LOG.error("error opening region " + regionInfo.getRegionName(), e); diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index fcadf636c1a..330f9d7bc3f 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -1155,15 +1155,22 @@ public class HStore implements HConstants { * @return True if this store needs compaction. */ boolean needsCompaction() { - boolean compactionNeeded = false; + return this.storefiles != null && + (this.storefiles.size() >= this.compactionThreshold || hasReferences()); + } + + /* + * @return True if this store has references. + */ + private boolean hasReferences() { if (this.storefiles != null) { - compactionNeeded = this.storefiles.size() >= this.compactionThreshold; - if (LOG.isDebugEnabled()) { - LOG.debug("compaction for HStore " + storeName + - (compactionNeeded ? " " : " not ") + "needed."); + for (HStoreFile hsf: this.storefiles.values()) { + if (hsf.isReference()) { + return true; + } } } - return compactionNeeded; + return false; } /** diff --git a/src/java/org/apache/hadoop/hbase/HStoreKey.java b/src/java/org/apache/hadoop/hbase/HStoreKey.java index d9814427979..38103504c2b 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreKey.java +++ b/src/java/org/apache/hadoop/hbase/HStoreKey.java @@ -68,13 +68,13 @@ public class HStoreKey implements WritableComparable { /** * Create an HStoreKey specifying the row and column names - * The timestamp defaults to Long.MAX_VALUE + * The timestamp defaults to LATEST_TIMESTAMP * * @param row row key * @param column column key */ public HStoreKey(Text row, Text column) { - this(row, column, Long.MAX_VALUE); + this(row, column, HConstants.LATEST_TIMESTAMP); } /** diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java index 18c9dad8030..c5c85a958c6 100644 --- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java +++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java @@ -74,6 +74,9 @@ public class MultiRegionTable extends HBaseTestCase { HRegionInfo parent = t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo(); LOG.info("Parent region " + parent.toString()); + Path parentDir = HRegion.getRegionDir(new Path(d, tableName), + parent.getEncodedName()); + assertTrue(fs.exists(parentDir)); // Now add content. addContent(new HTableIncommon(t), columnName); LOG.info("Finished content loading"); @@ -125,72 +128,73 @@ public class MultiRegionTable extends HBaseTestCase { // region we have been dealing with up to this. Its the parent of the // region split. Map data = getSplitParentInfo(meta, parent); - parent = Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO)); - LOG.info("Found parent region: " + parent); - assertTrue(parent.isOffline()); - assertTrue(parent.isSplit()); - HRegionInfo splitA = - Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA)); - HRegionInfo splitB = - Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB)); - Path parentDir = HRegion.getRegionDir(new Path(d, tableName), - parent.getEncodedName()); - assertTrue(fs.exists(parentDir)); - - LOG.info("Split happened. Parent is " + parent.getRegionName()); - - // Recalibrate will cause us to wait on new regions' deployment - recalibrate(t, new Text(columnName), retries, waitTime); - - if (splitA == null) { - LOG.info("splitA was already null. Assuming it was previously compacted."); + if (data == null) { + // We changed stuff so daughters get cleaned up much faster now. Can + // run so fast, parent has been deleted by time we get to here. } else { - LOG.info("Daughter splitA: " + splitA.getRegionName()); - // Compact a region at a time so we can test case where one region has - // no references but the other still has some - compact(cluster, splitA); - - // Wait till the parent only has reference to remaining split, one that - // still has references. - while (true) { - data = getSplitParentInfo(meta, parent); - if (data == null || data.size() == 3) { - try { - Thread.sleep(waitTime); - } catch (InterruptedException e) { - // continue + parent = Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO)); + LOG.info("Found parent region: " + parent); + assertTrue(parent.isOffline()); + assertTrue(parent.isSplit()); + HRegionInfo splitA = + Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA)); + HRegionInfo splitB = + Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB)); + assertTrue(fs.exists(parentDir)); + LOG.info("Split happened. Parent is " + parent.getRegionName()); + + // Recalibrate will cause us to wait on new regions' deployment + recalibrate(t, new Text(columnName), retries, waitTime); + + if (splitA == null) { + LOG.info("splitA was already null. Assuming it was previously compacted."); + } else { + LOG.info("Daughter splitA: " + splitA.getRegionName()); + // Compact a region at a time so we can test case where one region has + // no references but the other still has some + compact(cluster, splitA); + + // Wait till the parent only has reference to remaining split, one that + // still has references. + while (true) { + data = getSplitParentInfo(meta, parent); + if (data == null || data.size() == 3) { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + // continue + } + continue; } - continue; + break; } - break; + LOG.info("Parent split info returned " + data.keySet().toString()); } - LOG.info("Parent split info returned " + data.keySet().toString()); + + if (splitB == null) { + LOG.info("splitB was already null. Assuming it was previously compacted."); + } else { + LOG.info("Daughter splitB: " + splitA.getRegionName()); + + // Call second split. + compact(cluster, splitB); + } + // Now wait until parent disappears. + LOG.info("Waiting on parent " + parent.getRegionName() + " to disappear"); + for (int i = 0; i < retries; i++) { + if (getSplitParentInfo(meta, parent) == null) { + break; + } + + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + // continue + } + } + assertNull(getSplitParentInfo(meta, parent)); } - if (splitB == null) { - LOG.info("splitB was already null. Assuming it was previously compacted."); - } else { - LOG.info("Daughter splitB: " + splitA.getRegionName()); - - // Call second split. - compact(cluster, splitB); - } - - // Now wait until parent disappears. - LOG.info("Waiting on parent " + parent.getRegionName() + " to disappear"); - for (int i = 0; i < retries; i++) { - if (getSplitParentInfo(meta, parent) == null) { - break; - } - - try { - Thread.sleep(waitTime); - } catch (InterruptedException e) { - // continue - } - } - assertNull(getSplitParentInfo(meta, parent)); - // Assert cleaned up. for (int i = 0; i < retries; i++) { diff --git a/src/test/org/apache/hadoop/hbase/TestCompaction.java b/src/test/org/apache/hadoop/hbase/TestCompaction.java index 41fdcedd960..555596ecb31 100644 --- a/src/test/org/apache/hadoop/hbase/TestCompaction.java +++ b/src/test/org/apache/hadoop/hbase/TestCompaction.java @@ -51,6 +51,7 @@ public class TestCompaction extends HBaseTestCase { // Set cache flush size to 1MB conf.setInt("hbase.hregion.memcache.flush.size", 1024*1024); + conf.setInt("hbase.hregion.memcache.block.multiplier", 2); this.cluster = null; } diff --git a/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java b/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java index 306295c4f7f..e5a32df0a02 100644 --- a/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java +++ b/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java @@ -48,6 +48,8 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MultiSearcher; @@ -83,16 +85,16 @@ public class TestTableIndex extends MultiRegionTable { /** {@inheritDoc} */ @Override public void setUp() throws Exception { + // Enable DEBUG-level MR logging. + Logger.getLogger("org.apache.hadoop.mapred").setLevel(Level.DEBUG); + // Make sure the cache gets flushed so we trigger a compaction(s) and // hence splits. conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024); - - // Always compact if there is more than one store file. - conf.setInt("hbase.hstore.compactionThreshold", 2); // This size should make it so we always split using the addContent // below. After adding all data, the first region is 1.3M - conf.setLong("hbase.hregion.max.filesize", 128 * 1024); + conf.setLong("hbase.hregion.max.filesize", 1024 * 1024); desc = new HTableDescriptor(TABLE_NAME); desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));