HBASE-5542 Unify HRegion.mutateRowsWithLocks() and HRegion.processRow() (Scott Chen)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303915 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
772ecdd236
commit
6fb055da00
|
@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
@ -95,7 +97,6 @@ import org.apache.hadoop.hbase.client.RowMutations;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.coprocessor.RowProcessor;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
|
@ -232,7 +233,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
final Configuration conf;
|
||||
final int rowLockWaitDuration;
|
||||
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
||||
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 10 * 1000L;
|
||||
|
||||
// negative number indicates infinite timeout
|
||||
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
|
||||
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
|
||||
|
||||
final HRegionInfo regionInfo;
|
||||
final Path regiondir;
|
||||
KeyValue.KVComparator comparator;
|
||||
|
@ -486,6 +491,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
||||
HConstants.LATEST_TIMESTAMP);
|
||||
|
||||
/**
|
||||
* Timeout for the process time in processRowsWithLocks().
|
||||
* Use -1 to switch off time bound.
|
||||
*/
|
||||
this.rowProcessorTimeout = conf.getLong(
|
||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||
|
||||
|
@ -1676,7 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/*
|
||||
* @param delete The passed delete is modified by this method. WARNING!
|
||||
*/
|
||||
private void prepareDelete(Delete delete) throws IOException {
|
||||
void prepareDelete(Delete delete) throws IOException {
|
||||
// Check to see if this is a deleteRow insert
|
||||
if(delete.getFamilyMap().isEmpty()){
|
||||
for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
|
||||
|
@ -1748,7 +1757,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @param now
|
||||
* @throws IOException
|
||||
*/
|
||||
private void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
|
||||
void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
|
||||
throws IOException {
|
||||
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
|
||||
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
|
||||
|
@ -2367,7 +2376,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
|
||||
* with the provided current timestamp.
|
||||
*/
|
||||
private void updateKVTimestamps(
|
||||
void updateKVTimestamps(
|
||||
final Iterable<List<KeyValue>> keyLists, final byte[] now) {
|
||||
for (List<KeyValue> keys: keyLists) {
|
||||
if (keys == null) continue;
|
||||
|
@ -2591,7 +2600,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Check the collection of families for validity.
|
||||
* @throws NoSuchColumnFamilyException if a family does not exist.
|
||||
*/
|
||||
private void checkFamilies(Collection<byte[]> families)
|
||||
void checkFamilies(Collection<byte[]> families)
|
||||
throws NoSuchColumnFamilyException {
|
||||
for (byte[] family : families) {
|
||||
checkFamily(family);
|
||||
|
@ -2601,7 +2610,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
checkTimestamps(p.getFamilyMap(), now);
|
||||
}
|
||||
|
||||
private void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
|
||||
void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
|
||||
long now) throws DoNotRetryIOException {
|
||||
if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
|
||||
return;
|
||||
|
@ -4232,42 +4241,71 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
public void mutateRowsWithLocks(Collection<Mutation> mutations,
|
||||
Collection<byte[]> rowsToLock) throws IOException {
|
||||
boolean flush = false;
|
||||
|
||||
MultiRowMutationProcessor proc =
|
||||
new MultiRowMutationProcessor(mutations, rowsToLock);
|
||||
processRowsWithLocks(proc, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs atomic multiple reads and writes on a given row.
|
||||
*
|
||||
* @param processor The object defines the reads and writes to a row.
|
||||
*/
|
||||
public void processRowsWithLocks(RowProcessor<?> processor)
|
||||
throws IOException {
|
||||
processRowsWithLocks(processor, rowProcessorTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs atomic multiple reads and writes on a given row.
|
||||
*
|
||||
* @param processor The object defines the reads and writes to a row.
|
||||
* @param timeout The timeout of the processor.process() execution
|
||||
* Use a negative number to switch off the time bound
|
||||
*/
|
||||
public void processRowsWithLocks(RowProcessor<?> processor, long timeout)
|
||||
throws IOException {
|
||||
|
||||
for (byte[] row : processor.getRowsToLock()) {
|
||||
checkRow(row, "processRowsWithLocks");
|
||||
}
|
||||
if (!processor.readOnly()) {
|
||||
checkReadOnly();
|
||||
}
|
||||
checkResources();
|
||||
|
||||
startRegionOperation();
|
||||
List<Integer> acquiredLocks = null;
|
||||
try {
|
||||
// 1. run all pre-hooks before the atomic operation
|
||||
// if any pre hook indicates "bypass", bypass the entire operation
|
||||
WALEdit walEdit = new WALEdit();
|
||||
|
||||
// one WALEdit is used for all edits.
|
||||
WALEdit walEdit = new WALEdit();
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
|
||||
// by pass everything
|
||||
return;
|
||||
}
|
||||
} else if (m instanceof Delete) {
|
||||
Delete d = (Delete) m;
|
||||
prepareDelete(d);
|
||||
if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
|
||||
// by pass everything
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// 1. Run pre-process hook
|
||||
processor.preProcess(this, walEdit);
|
||||
|
||||
// Short circuit the read only case
|
||||
if (processor.readOnly()) {
|
||||
try {
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
doProcessRowWithTimeout(
|
||||
processor, now, this, null, null, timeout);
|
||||
processor.postProcess(this, walEdit);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
long txid = 0;
|
||||
boolean walSyncSuccessful = false;
|
||||
boolean locked = false;
|
||||
|
||||
// 2. acquire the row lock(s)
|
||||
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
||||
boolean locked = false;
|
||||
boolean walSyncSuccessful = false;
|
||||
List<Integer> acquiredLocks = null;
|
||||
long addedSize = 0;
|
||||
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
||||
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
||||
try {
|
||||
// 2. Acquire the row lock(s)
|
||||
acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
|
||||
for (byte[] row : rowsToLock) {
|
||||
// attempt to lock all involved rows, fail if one lock times out
|
||||
// Attempt to lock all involved rows, fail if one lock times out
|
||||
Integer lid = getLock(null, row, true);
|
||||
if (lid == null) {
|
||||
throw new IOException("Failed to acquire lock on "
|
||||
|
@ -4275,200 +4313,51 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
acquiredLocks.add(lid);
|
||||
}
|
||||
|
||||
// 3. acquire the region lock
|
||||
// 3. Region lock
|
||||
this.updatesLock.readLock().lock();
|
||||
locked = true;
|
||||
|
||||
// 4. Get a mvcc write number
|
||||
MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
byte[] byteNow = Bytes.toBytes(now);
|
||||
try {
|
||||
// 5. Check mutations and apply edits to a single WALEdit
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
|
||||
checkFamilies(familyMap.keySet());
|
||||
checkTimestamps(familyMap, now);
|
||||
updateKVTimestamps(familyMap.values(), byteNow);
|
||||
} else if (m instanceof Delete) {
|
||||
Delete d = (Delete) m;
|
||||
prepareDelete(d);
|
||||
prepareDeleteTimestamps(d, byteNow);
|
||||
} else {
|
||||
throw new DoNotRetryIOException(
|
||||
"Action must be Put or Delete. But was: "
|
||||
+ m.getClass().getName());
|
||||
}
|
||||
if (m.getWriteToWAL()) {
|
||||
addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
|
||||
}
|
||||
}
|
||||
|
||||
// 6. append all edits at once (don't sync)
|
||||
if (walEdit.size() > 0) {
|
||||
txid = this.log.appendNoSync(regionInfo,
|
||||
this.htableDescriptor.getName(), walEdit,
|
||||
HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
|
||||
}
|
||||
|
||||
// 7. apply to memstore
|
||||
long addedSize = 0;
|
||||
for (Mutation m : mutations) {
|
||||
addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
|
||||
}
|
||||
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
|
||||
|
||||
// 8. release region and row lock(s)
|
||||
this.updatesLock.readLock().unlock();
|
||||
locked = false;
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
acquiredLocks = null;
|
||||
}
|
||||
|
||||
// 9. sync WAL if required
|
||||
if (walEdit.size() > 0 &&
|
||||
(this.regionInfo.isMetaRegion() ||
|
||||
!this.htableDescriptor.isDeferredLogFlush())) {
|
||||
this.log.sync(txid);
|
||||
}
|
||||
walSyncSuccessful = true;
|
||||
|
||||
// 10. advance mvcc
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
w = null;
|
||||
|
||||
// 11. run coprocessor post host hooks
|
||||
// after the WAL is sync'ed and all locks are released
|
||||
// (similar to doMiniBatchPut)
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
|
||||
} else if (m instanceof Delete) {
|
||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// 12. clean up if needed
|
||||
if (!walSyncSuccessful) {
|
||||
int kvsRolledback = 0;
|
||||
for (Mutation m : mutations) {
|
||||
for (Map.Entry<byte[], List<KeyValue>> e : m.getFamilyMap()
|
||||
.entrySet()) {
|
||||
List<KeyValue> kvs = e.getValue();
|
||||
byte[] family = e.getKey();
|
||||
Store store = getStore(family);
|
||||
// roll back each kv
|
||||
for (KeyValue kv : kvs) {
|
||||
store.rollback(kv);
|
||||
kvsRolledback++;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback
|
||||
+ " KeyValues");
|
||||
}
|
||||
|
||||
if (w != null) {
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
}
|
||||
|
||||
if (locked) {
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (flush) {
|
||||
// 13. Flush cache if needed. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
closeRegionOperation();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs atomic multiple reads and writes on a given row.
|
||||
* @param processor The object defines the reads and writes to a row.
|
||||
*/
|
||||
public void processRow(RowProcessor<?> processor)
|
||||
throws IOException {
|
||||
byte[] row = processor.getRow();
|
||||
checkRow(row, "processRow");
|
||||
if (!processor.readOnly()) {
|
||||
checkReadOnly();
|
||||
}
|
||||
checkResources();
|
||||
|
||||
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
||||
|
||||
startRegionOperation();
|
||||
|
||||
boolean locked = false;
|
||||
boolean walSyncSuccessful = false;
|
||||
Integer rowLockID = null;
|
||||
long addedSize = 0;
|
||||
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
||||
try {
|
||||
// 1. Row lock
|
||||
rowLockID = getLock(null, row, true);
|
||||
|
||||
// 2. Region lock
|
||||
this.updatesLock.readLock().lock();
|
||||
locked = true;
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
try {
|
||||
// 3. Let the processor scan the row and generate mutations
|
||||
WALEdit walEdits = new WALEdit();
|
||||
doProcessRowWithTimeout(processor, now, rowScanner, mutations,
|
||||
walEdits, rowProcessorTimeout);
|
||||
if (processor.readOnly() && !mutations.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Processor is readOnly but generating mutations on row:" +
|
||||
Bytes.toStringBinary(row));
|
||||
}
|
||||
// 4. Let the processor scan the rows, generate mutations and add
|
||||
// waledits
|
||||
doProcessRowWithTimeout(
|
||||
processor, now, this, mutations, walEdit, timeout);
|
||||
|
||||
if (!mutations.isEmpty()) {
|
||||
// 4. Get a mvcc write number
|
||||
// 5. Get a mvcc write number
|
||||
writeEntry = mvcc.beginMemstoreInsert();
|
||||
// 5. Apply to memstore and a WALEdit
|
||||
// 6. Apply to memstore
|
||||
for (KeyValue kv : mutations) {
|
||||
kv.setMemstoreTS(writeEntry.getWriteNumber());
|
||||
walEdits.add(kv);
|
||||
addedSize += stores.get(kv.getFamily()).add(kv);
|
||||
byte[] family = kv.getFamily();
|
||||
checkFamily(family);
|
||||
addedSize += stores.get(family).add(kv);
|
||||
}
|
||||
|
||||
long txid = 0;
|
||||
// 6. Append no sync
|
||||
if (!walEdits.isEmpty()) {
|
||||
// 7. Append no sync
|
||||
if (!walEdit.isEmpty()) {
|
||||
txid = this.log.appendNoSync(this.regionInfo,
|
||||
this.htableDescriptor.getName(), walEdits,
|
||||
this.htableDescriptor.getName(), walEdit,
|
||||
processor.getClusterId(), now, this.htableDescriptor);
|
||||
}
|
||||
// 7. Release region lock
|
||||
// 8. Release region lock
|
||||
if (locked) {
|
||||
this.updatesLock.readLock().unlock();
|
||||
locked = false;
|
||||
}
|
||||
// 8. Release row lock
|
||||
if (rowLockID != null) {
|
||||
releaseRowLock(rowLockID);
|
||||
rowLockID = null;
|
||||
// 9. Release row lock(s)
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
acquiredLocks = null;
|
||||
}
|
||||
// 9. Sync edit log
|
||||
if (txid != 0) {
|
||||
// 10. Sync edit log
|
||||
if (txid != 0 &&
|
||||
(this.regionInfo.isMetaRegion() ||
|
||||
!this.htableDescriptor.isDeferredLogFlush())) {
|
||||
this.log.sync(txid);
|
||||
}
|
||||
walSyncSuccessful = true;
|
||||
|
@ -4476,12 +4365,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
} finally {
|
||||
if (!mutations.isEmpty() && !walSyncSuccessful) {
|
||||
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
|
||||
" memstore keyvalues for row:" + processor.getRow());
|
||||
" memstore keyvalues for row(s):" +
|
||||
processor.getRowsToLock().iterator().next() + "...");
|
||||
for (KeyValue kv : mutations) {
|
||||
stores.get(kv.getFamily()).rollback(kv);
|
||||
}
|
||||
}
|
||||
// 10. Roll mvcc forward
|
||||
// 11. Roll mvcc forward
|
||||
if (writeEntry != null) {
|
||||
mvcc.completeMemstoreInsert(writeEntry);
|
||||
writeEntry = null;
|
||||
|
@ -4490,11 +4380,16 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.updatesLock.readLock().unlock();
|
||||
locked = false;
|
||||
}
|
||||
if (rowLockID != null) {
|
||||
releaseRowLock(rowLockID);
|
||||
rowLockID = null;
|
||||
if (acquiredLocks != null) {
|
||||
for (Integer lid : acquiredLocks) {
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 12. Run post-process hook
|
||||
processor.postProcess(this, walEdit);
|
||||
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
if (!mutations.isEmpty() &&
|
||||
|
@ -4506,48 +4401,54 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
private void doProcessRowWithTimeout(final RowProcessor<?> processor,
|
||||
final long now,
|
||||
final RowProcessor.RowScanner scanner,
|
||||
final HRegion region,
|
||||
final List<KeyValue> mutations,
|
||||
final WALEdit walEdits,
|
||||
final WALEdit walEdit,
|
||||
final long timeout) throws IOException {
|
||||
// Short circuit the no time bound case.
|
||||
if (timeout < 0) {
|
||||
try {
|
||||
processor.process(now, region, mutations, walEdit);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("RowProcessor:" + processor.getClass().getName() +
|
||||
" throws Exception on row(s):" +
|
||||
Bytes.toStringBinary(
|
||||
processor.getRowsToLock().iterator().next()) + "...", e);
|
||||
throw e;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Case with time bound
|
||||
FutureTask<Void> task =
|
||||
new FutureTask<Void>(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
processor.process(now, scanner, mutations, walEdits);
|
||||
return null;
|
||||
try {
|
||||
processor.process(now, region, mutations, walEdit);
|
||||
return null;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("RowProcessor:" + processor.getClass().getName() +
|
||||
" throws Exception on row(s):" +
|
||||
Bytes.toStringBinary(
|
||||
processor.getRowsToLock().iterator().next()) + "...", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
});
|
||||
Thread t = new Thread(task);
|
||||
t.setDaemon(true);
|
||||
t.start();
|
||||
rowProcessorExecutor.execute(task);
|
||||
try {
|
||||
task.get(timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (TimeoutException te) {
|
||||
LOG.error("RowProcessor timeout on row:" +
|
||||
Bytes.toStringBinary(processor.getRow()) + " timeout:" + timeout, te);
|
||||
LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
|
||||
Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
|
||||
"...");
|
||||
throw new IOException(te);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
final private RowProcessor.RowScanner rowScanner =
|
||||
new RowProcessor.RowScanner() {
|
||||
@Override
|
||||
public void doScan(Scan scan, List<KeyValue> result) throws IOException {
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||
scanner = HRegion.this.getScanner(scan);
|
||||
result.clear();
|
||||
scanner.next(result);
|
||||
} finally {
|
||||
if (scanner != null) scanner.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: There's a lot of boiler plate code identical
|
||||
// to increment... See how to better unify that.
|
||||
/**
|
||||
|
|
|
@ -138,40 +138,45 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("SplitLogWorker " + this.serverName + " starting");
|
||||
this.watcher.registerListener(this);
|
||||
int res;
|
||||
// wait for master to create the splitLogZnode
|
||||
res = -1;
|
||||
while (res == -1) {
|
||||
try {
|
||||
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
|
||||
} catch (KeeperException e) {
|
||||
// ignore
|
||||
LOG.warn("Exception when checking for " + watcher.splitLogZNode +
|
||||
" ... retrying", e);
|
||||
}
|
||||
if (res == -1) {
|
||||
try {
|
||||
LOG.info("SplitLogWorker " + this.serverName + " starting");
|
||||
this.watcher.registerListener(this);
|
||||
int res;
|
||||
// wait for master to create the splitLogZnode
|
||||
res = -1;
|
||||
while (res == -1 && !exitWorker) {
|
||||
try {
|
||||
LOG.info(watcher.splitLogZNode + " znode does not exist," +
|
||||
" waiting for master to create one");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode);
|
||||
assert exitWorker == true;
|
||||
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
|
||||
} catch (KeeperException e) {
|
||||
// ignore
|
||||
LOG.warn("Exception when checking for " + watcher.splitLogZNode +
|
||||
" ... retrying", e);
|
||||
}
|
||||
if (res == -1) {
|
||||
try {
|
||||
LOG.info(watcher.splitLogZNode + " znode does not exist," +
|
||||
" waiting for master to create one");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
|
||||
+ (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
|
||||
"exiting anyway)"));
|
||||
exitWorker = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taskLoop();
|
||||
} catch (Throwable t) {
|
||||
// only a logical error can cause here. Printing it out
|
||||
// to make debugging easier
|
||||
LOG.error("unexpected error ", t);
|
||||
} finally {
|
||||
LOG.info("SplitLogWorker " + this.serverName + " exiting");
|
||||
}
|
||||
if (!exitWorker) {
|
||||
taskLoop();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// only a logical error can cause here. Printing it out
|
||||
// to make debugging easier
|
||||
LOG.error("unexpected error ", t);
|
||||
} finally {
|
||||
LOG.info("SplitLogWorker " + this.serverName + " exiting");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -183,7 +188,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
|||
* try to grab every task that has been put up
|
||||
*/
|
||||
private void taskLoop() {
|
||||
while (true) {
|
||||
while (!exitWorker) {
|
||||
int seq_start = taskReadySeq;
|
||||
List<String> paths = getTaskList();
|
||||
if (paths == null) {
|
||||
|
@ -197,7 +202,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
|||
// don't call ZKSplitLog.getNodeName() because that will lead to
|
||||
// double encoding of the path name
|
||||
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
|
||||
if (exitWorker == true) {
|
||||
if (exitWorker) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -207,8 +212,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
|||
taskReadyLock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("SplitLogWorker interrupted while waiting for task," +
|
||||
" exiting: " + e.toString());
|
||||
assert exitWorker == true;
|
||||
" exiting: " + e.toString() + (exitWorker ? "" :
|
||||
" (ERROR: exitWorker is not set, exiting anyway)"));
|
||||
exitWorker = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue