HBASE-7878 recoverFileLease does not check return value of recoverLease, third attempt (Ted Yu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1459786 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2cb6605bbd
commit
708669b53f
|
@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -34,6 +35,7 @@ import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -187,6 +189,20 @@ public class HLogSplitter {
|
||||||
*/
|
*/
|
||||||
public List<Path> splitLog()
|
public List<Path> splitLog()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
return splitLog((CountDownLatch) null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Split up a bunch of regionserver commit log files that are no longer being
|
||||||
|
* written to, into new files, one per region for region to replay on startup.
|
||||||
|
* Delete the old log files when finished.
|
||||||
|
*
|
||||||
|
* @param latch
|
||||||
|
* @throws IOException will throw if corrupted hlogs aren't tolerated
|
||||||
|
* @return the list of splits
|
||||||
|
*/
|
||||||
|
public List<Path> splitLog(CountDownLatch latch)
|
||||||
|
throws IOException {
|
||||||
Preconditions.checkState(!hasSplit,
|
Preconditions.checkState(!hasSplit,
|
||||||
"An HLogSplitter instance may only be used once");
|
"An HLogSplitter instance may only be used once");
|
||||||
hasSplit = true;
|
hasSplit = true;
|
||||||
|
@ -210,7 +226,7 @@ public class HLogSplitter {
|
||||||
}
|
}
|
||||||
logAndReport("Splitting " + logfiles.length + " hlog(s) in "
|
logAndReport("Splitting " + logfiles.length + " hlog(s) in "
|
||||||
+ srcDir.toString());
|
+ srcDir.toString());
|
||||||
splits = splitLog(logfiles);
|
splits = splitLog(logfiles, latch);
|
||||||
|
|
||||||
splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||||
String msg = "hlog file splitting completed in " + splitTime +
|
String msg = "hlog file splitting completed in " + splitTime +
|
||||||
|
@ -274,7 +290,8 @@ public class HLogSplitter {
|
||||||
* After the process is complete, the log files are archived to a separate
|
* After the process is complete, the log files are archived to a separate
|
||||||
* directory.
|
* directory.
|
||||||
*/
|
*/
|
||||||
private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
|
private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
|
||||||
|
throws IOException {
|
||||||
List<Path> processedLogs = new ArrayList<Path>(logfiles.length);
|
List<Path> processedLogs = new ArrayList<Path>(logfiles.length);
|
||||||
List<Path> corruptedLogs = new ArrayList<Path>(logfiles.length);
|
List<Path> corruptedLogs = new ArrayList<Path>(logfiles.length);
|
||||||
List<Path> splits;
|
List<Path> splits;
|
||||||
|
@ -322,7 +339,16 @@ public class HLogSplitter {
|
||||||
}
|
}
|
||||||
status.setStatus("Log splits complete. Checking for orphaned logs.");
|
status.setStatus("Log splits complete. Checking for orphaned logs.");
|
||||||
|
|
||||||
if (fs.listStatus(srcDir).length > processedLogs.size()
|
if (latch != null) {
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("wait for latch interrupted");
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FileStatus[] currFiles = fs.listStatus(srcDir);
|
||||||
|
if (currFiles.length > processedLogs.size()
|
||||||
+ corruptedLogs.size()) {
|
+ corruptedLogs.size()) {
|
||||||
throw new OrphanHLogAfterSplitException(
|
throw new OrphanHLogAfterSplitException(
|
||||||
"Discovered orphan hlog after split. Maybe the "
|
"Discovered orphan hlog after split. Maybe the "
|
||||||
|
|
|
@ -55,6 +55,8 @@ public class FSHDFSUtils extends FSUtils{
|
||||||
*/
|
*/
|
||||||
public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
|
public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
|
||||||
|
|
||||||
|
public static final String TEST_TRIGGER_DFS_APPEND = "hbase.test.trigger.dfs.append";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
|
public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
|
||||||
throws IOException{
|
throws IOException{
|
||||||
|
@ -72,22 +74,37 @@ public class FSHDFSUtils extends FSUtils{
|
||||||
|
|
||||||
// Trying recovery
|
// Trying recovery
|
||||||
boolean recovered = false;
|
boolean recovered = false;
|
||||||
|
long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 300000);
|
||||||
|
// conf parameter passed from unit test, indicating whether fs.append() should be triggered
|
||||||
|
boolean triggerAppend = conf.getBoolean(TEST_TRIGGER_DFS_APPEND, false);
|
||||||
|
Exception ex = null;
|
||||||
while (!recovered) {
|
while (!recovered) {
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
DistributedFileSystem.class.getMethod("recoverLease", new Class[] { Path.class }).invoke(
|
if (triggerAppend) throw new IOException();
|
||||||
dfs, p);
|
try {
|
||||||
|
recovered = (Boolean) DistributedFileSystem.class.getMethod(
|
||||||
|
"recoverLease", new Class[] { Path.class }).invoke(dfs, p);
|
||||||
|
if (!recovered) LOG.debug("recoverLease returned false");
|
||||||
} catch (InvocationTargetException ite) {
|
} catch (InvocationTargetException ite) {
|
||||||
// function was properly called, but threw it's own exception
|
// function was properly called, but threw it's own exception
|
||||||
throw (IOException) ite.getCause();
|
throw (IOException) ite.getCause();
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
|
LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
|
||||||
", trying fs.append instead");
|
", trying fs.append instead");
|
||||||
|
ex = e;
|
||||||
|
}
|
||||||
|
if (ex != null || System.currentTimeMillis() - startWaiting > recoveryTimeout) {
|
||||||
|
LOG.debug("trying fs.append for " + p + " with " + ex);
|
||||||
|
ex = null; // assume the following append() call would succeed
|
||||||
FSDataOutputStream out = fs.append(p);
|
FSDataOutputStream out = fs.append(p);
|
||||||
out.close();
|
out.close();
|
||||||
}
|
|
||||||
recovered = true;
|
recovered = true;
|
||||||
|
LOG.debug("fs.append passed");
|
||||||
|
}
|
||||||
|
if (recovered) break;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
e = RemoteExceptionHandler.checkIOException(e);
|
||||||
if (e instanceof AlreadyBeingCreatedException) {
|
if (e instanceof AlreadyBeingCreatedException) {
|
||||||
|
@ -111,9 +128,9 @@ public class FSHDFSUtils extends FSUtils{
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ie) {
|
||||||
InterruptedIOException iioe = new InterruptedIOException();
|
InterruptedIOException iioe = new InterruptedIOException();
|
||||||
iioe.initCause(ex);
|
iioe.initCause(ie);
|
||||||
throw iioe;
|
throw iioe;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
@ -49,7 +50,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -99,6 +99,7 @@ public class TestHLog {
|
||||||
// Make block sizes small.
|
// Make block sizes small.
|
||||||
TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
|
TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
|
||||||
// needed for testAppendClose()
|
// needed for testAppendClose()
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
|
||||||
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
|
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
|
||||||
// quicker heartbeat interval for faster DN death notification
|
// quicker heartbeat interval for faster DN death notification
|
||||||
TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
|
TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
|
||||||
|
@ -370,18 +371,30 @@ public class TestHLog {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For this test to pass, requires:
|
/*
|
||||||
// 1. HDFS-200 (append support)
|
* We pass different values to recoverFileLease() so that different code paths are covered
|
||||||
// 2. HDFS-988 (SafeMode should freeze file operations
|
*
|
||||||
// [FSNamesystem.nextGenerationStampForBlock])
|
* For this test to pass, requires:
|
||||||
// 3. HDFS-142 (on restart, maintain pendingCreates)
|
* 1. HDFS-200 (append support)
|
||||||
|
* 2. HDFS-988 (SafeMode should freeze file operations
|
||||||
|
* [FSNamesystem.nextGenerationStampForBlock])
|
||||||
|
* 3. HDFS-142 (on restart, maintain pendingCreates)
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testAppendClose() throws Exception {
|
public void testAppendClose() throws Exception {
|
||||||
|
testAppendClose(true);
|
||||||
|
testAppendClose(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param triggerDirectAppend whether to trigger direct call of fs.append()
|
||||||
|
*/
|
||||||
|
public void testAppendClose(final boolean triggerDirectAppend) throws Exception {
|
||||||
byte [] tableName = Bytes.toBytes(getName());
|
byte [] tableName = Bytes.toBytes(getName());
|
||||||
HRegionInfo regioninfo = new HRegionInfo(tableName,
|
HRegionInfo regioninfo = new HRegionInfo(tableName,
|
||||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
|
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
|
||||||
|
|
||||||
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
|
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir" + triggerDirectAppend,
|
||||||
"hlogdir_archive", conf);
|
"hlogdir_archive", conf);
|
||||||
final int total = 20;
|
final int total = 20;
|
||||||
|
|
||||||
|
@ -456,6 +469,7 @@ public class TestHLog {
|
||||||
public Exception exception = null;
|
public Exception exception = null;
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
rlConf.setBoolean(FSHDFSUtils.TEST_TRIGGER_DFS_APPEND, triggerDirectAppend);
|
||||||
FSUtils.getInstance(fs, rlConf)
|
FSUtils.getInstance(fs, rlConf)
|
||||||
.recoverFileLease(recoveredFs, walPath, rlConf);
|
.recoverFileLease(recoveredFs, walPath, rlConf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -137,6 +138,8 @@ public class TestHLogSplit {
|
||||||
FSUtils.setRootDir(TEST_UTIL.getConfiguration(), HBASEDIR);
|
FSUtils.setRootDir(TEST_UTIL.getConfiguration(), HBASEDIR);
|
||||||
TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl",
|
TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl",
|
||||||
InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
|
InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
|
||||||
// This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
|
// This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
|
||||||
System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
|
System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
|
||||||
// Create fake maping user to group and set it to the conf.
|
// Create fake maping user to group and set it to the conf.
|
||||||
|
@ -253,9 +256,9 @@ public class TestHLogSplit {
|
||||||
try {
|
try {
|
||||||
doWriting();
|
doWriting();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
flushToConsole(getName() + " Writer exiting " + e);
|
LOG.warn(getName() + " Writer exiting " + e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
flushToConsole(getName() + " Writer exiting " + e);
|
LOG.warn(getName() + " Writer exiting " + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,11 +381,12 @@ public class TestHLogSplit {
|
||||||
|
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
try {
|
try {
|
||||||
(new ZombieNewLogWriterRegionServer(stop)).start();
|
(new ZombieNewLogWriterRegionServer(latch, stop)).start();
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
|
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
|
||||||
logSplitter.splitLog();
|
logSplitter.splitLog(latch);
|
||||||
} finally {
|
} finally {
|
||||||
stop.set(true);
|
stop.set(true);
|
||||||
}
|
}
|
||||||
|
@ -713,16 +717,23 @@ public class TestHLogSplit {
|
||||||
AtomicBoolean stop = new AtomicBoolean(false);
|
AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
Thread zombie = new ZombieNewLogWriterRegionServer(stop);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
Thread zombie = new ZombieNewLogWriterRegionServer(latch, stop);
|
||||||
|
|
||||||
|
List<Path> splits = null;
|
||||||
try {
|
try {
|
||||||
zombie.start();
|
zombie.start();
|
||||||
try {
|
try {
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
|
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
|
||||||
logSplitter.splitLog();
|
splits = logSplitter.splitLog();
|
||||||
} catch (IOException ex) {/* expected */}
|
} catch (IOException ex) {
|
||||||
int logFilesNumber = fs.listStatus(HLOGDIR).length;
|
/* expected */
|
||||||
|
LOG.warn("testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted", ex);
|
||||||
|
}
|
||||||
|
FileStatus[] files = fs.listStatus(HLOGDIR);
|
||||||
|
if (files == null) fail("no files in " + HLOGDIR + " with splits " + splits);
|
||||||
|
int logFilesNumber = files.length;
|
||||||
|
|
||||||
assertEquals("Log files should not be archived if there's an extra file after split",
|
assertEquals("Log files should not be archived if there's an extra file after split",
|
||||||
NUM_WRITERS + 1, logFilesNumber);
|
NUM_WRITERS + 1, logFilesNumber);
|
||||||
|
@ -1066,8 +1077,10 @@ public class TestHLogSplit {
|
||||||
*/
|
*/
|
||||||
class ZombieNewLogWriterRegionServer extends Thread {
|
class ZombieNewLogWriterRegionServer extends Thread {
|
||||||
AtomicBoolean stop;
|
AtomicBoolean stop;
|
||||||
public ZombieNewLogWriterRegionServer(AtomicBoolean stop) {
|
CountDownLatch latch;
|
||||||
|
public ZombieNewLogWriterRegionServer(CountDownLatch latch, AtomicBoolean stop) {
|
||||||
super("ZombieNewLogWriterRegionServer");
|
super("ZombieNewLogWriterRegionServer");
|
||||||
|
this.latch = latch;
|
||||||
this.stop = stop;
|
this.stop = stop;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1084,7 +1097,7 @@ public class TestHLogSplit {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
while (!fs.exists(recoveredEdits) && !stop.get()) {
|
while (!fs.exists(recoveredEdits) && !stop.get()) {
|
||||||
flushToConsole("Juliet: split not started, sleeping a bit...");
|
LOG.info("Juliet: split not started, sleeping a bit...");
|
||||||
Threads.sleep(10);
|
Threads.sleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1094,8 +1107,10 @@ public class TestHLogSplit {
|
||||||
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
|
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
|
||||||
("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
|
("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
|
||||||
writer.close();
|
writer.close();
|
||||||
flushToConsole("Juliet file creator: created file " + julietLog);
|
LOG.info("Juliet file creator: created file " + julietLog);
|
||||||
|
latch.countDown();
|
||||||
} catch (IOException e1) {
|
} catch (IOException e1) {
|
||||||
|
LOG.error("Failed to create file " + julietLog, e1);
|
||||||
assertTrue("Failed to create file " + julietLog, false);
|
assertTrue("Failed to create file " + julietLog, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1306,7 +1321,7 @@ public class TestHLogSplit {
|
||||||
}
|
}
|
||||||
if (i != leaveOpen) {
|
if (i != leaveOpen) {
|
||||||
ws[i].close();
|
ws[i].close();
|
||||||
flushToConsole("Closing writer " + i);
|
LOG.info("Closing writer " + i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ws;
|
return ws;
|
||||||
|
@ -1418,9 +1433,9 @@ public class TestHLogSplit {
|
||||||
byte[] row, byte[] family, byte[] qualifier,
|
byte[] row, byte[] family, byte[] qualifier,
|
||||||
byte[] value, long seq)
|
byte[] value, long seq)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
flushToConsole(Thread.currentThread().getName() + " append");
|
LOG.info(Thread.currentThread().getName() + " append");
|
||||||
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
||||||
flushToConsole(Thread.currentThread().getName() + " sync");
|
LOG.info(Thread.currentThread().getName() + " sync");
|
||||||
writer.sync();
|
writer.sync();
|
||||||
return seq;
|
return seq;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue