From 536ff218258177d923e8771b44fce2420dc4c9e5 Mon Sep 17 00:00:00 2001 From: larsh Date: Thu, 22 Mar 2012 17:51:37 +0000 Subject: [PATCH] HBASE-5542 Unify HRegion.mutateRowsWithLocks() and HRegion.processRow() (Scott Chen) part 2 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303920 13f79535-47bb-0310-9956-ffa450edef68 --- .../coprocessor/BaseRowProcessorEndpoint.java | 57 ++ .../coprocessor/RowProcessorProtocol.java | 41 ++ .../hbase/regionserver/BaseRowProcessor.java | 49 ++ .../MultiRowMutationProcessor.java | 126 ++++ .../RowProcessor.java | 65 +- .../regionserver/SplitLogWorker.java.orig | 564 +++++++++++++++++ .../coprocessor/TestProcessRowEndpoint.java | 321 ---------- .../coprocessor/TestRowProcessorEndpoint.java | 598 ++++++++++++++++++ 8 files changed, 1478 insertions(+), 343 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java rename src/main/java/org/apache/hadoop/hbase/{coprocessor => regionserver}/RowProcessor.java (51%) create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java.orig delete mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java new file mode 100644 index 00000000000..9ee63ffa94d --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RowProcessor; + +/** + * This class demonstrates how to implement atomic read-modify-writes + * using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor + implements RowProcessorProtocol { + + /** + * Pass a processor to HRegion to process multiple rows atomically. + * + * The RowProcessor implementations should be the inner classes of your + * RowProcessorEndpoint. This way the RowProcessor can be class-loaded with + * the Coprocessor endpoint together. + * + * See {@link TestRowProcessorEndpoint} for example. + * + * @param processor The object defines the read-modify-write procedure + * @return The processing result + */ + @Override + public T process(RowProcessor processor) + throws IOException { + HRegion region = + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); + region.processRowsWithLocks(processor); + return processor.getResult(); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java new file mode 100644 index 00000000000..c670c39fdd8 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RowProcessor; + +/** + * Defines a protocol to perform multi row transactions. + * See {@link BaseRowProcessorEndpoint} for the implementation. + * See {@link HRegion#processRowsWithLocks()} for detials. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RowProcessorProtocol extends CoprocessorProtocol { + + /** + * @param processor The processor defines how to process the row + */ + T process(RowProcessor processor) throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java new file mode 100644 index 00000000000..c407f98bde5 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/** + * Base class for RowProcessor with some default implementations. + */ +public abstract class BaseRowProcessor implements RowProcessor { + + @Override + public T getResult() { + return null; + } + + @Override + public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + } + + @Override + public void postProcess(HRegion region, WALEdit walEdit) throws IOException { + } + + @Override + public UUID getClusterId() { + return HConstants.DEFAULT_CLUSTER_ID; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java new file mode 100644 index 00000000000..208de5ca86c --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A MultiRowProcessor that performs multiple puts and deletes. + */ +class MultiRowMutationProcessor extends BaseRowProcessor { + Collection rowsToLock; + Collection mutations; + + MultiRowMutationProcessor(Collection mutations, + Collection rowsToLock) { + this.rowsToLock = rowsToLock; + this.mutations = mutations; + } + + @Override + public Collection getRowsToLock() { + return rowsToLock; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, + HRegion region, + List mutationKvs, + WALEdit walEdit) throws IOException { + byte[] byteNow = Bytes.toBytes(now); + // Check mutations and apply edits to a single WALEdit + for (Mutation m : mutations) { + if (m instanceof Put) { + Map> familyMap = m.getFamilyMap(); + region.checkFamilies(familyMap.keySet()); + region.checkTimestamps(familyMap, now); + region.updateKVTimestamps(familyMap.values(), byteNow); + } else if (m instanceof Delete) { + Delete d = (Delete) m; + region.prepareDelete(d); + region.prepareDeleteTimestamps(d, byteNow); + } else { + throw new DoNotRetryIOException( + "Action must be Put or Delete. But was: " + + m.getClass().getName()); + } + for (List edits : m.getFamilyMap().values()) { + boolean writeToWAL = m.getWriteToWAL(); + for (KeyValue kv : edits) { + mutationKvs.add(kv); + if (writeToWAL) { + walEdit.add(kv); + } + } + } + } + } + + @Override + public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + if (coprocessorHost != null) { + for (Mutation m : mutations) { + if (m instanceof Put) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { + // by pass everything + return; + } + } else if (m instanceof Delete) { + Delete d = (Delete) m; + region.prepareDelete(d); + if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) { + // by pass everything + return; + } + } + } + } + } + + @Override + public void postProcess(HRegion region, WALEdit walEdit) throws IOException { + RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + if (coprocessorHost != null) { + for (Mutation m : mutations) { + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); + } else if (m instanceof Delete) { + coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + } + } + } + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java similarity index 51% rename from src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java rename to src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index 75c7768a37f..15c67f26949 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -15,30 +15,40 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.coprocessor; +package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.UUID; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Public +@InterfaceStability.Evolving /** * Defines the procedure to atomically perform multiple scans and mutations - * on one single row. The generic type parameter T is the return type of + * on a HRegion. + * + * This is invoked by {@link HRegion#processRowsWithLocks()}. + * This class performs scans and generates mutations and WAL edits. + * The locks and MVCC will be handled by HRegion. + * + * The generic type parameter T is the return type of * RowProcessor.getResult(). */ -@InterfaceAudience.Public -public interface RowProcessor extends Writable { +public interface RowProcessor { /** - * Which row to perform the read-write + * Rows to lock while operation. + * They have to be sorted with RowProcessor + * to avoid deadlock. */ - byte[] getRow(); + Collection getRowsToLock(); /** * Obtain the processing result @@ -53,29 +63,40 @@ public interface RowProcessor extends Writable { boolean readOnly(); /** - * HRegion calls this to process a row. You should override this to create - * your own RowProcessor. + * HRegion handles the locks and MVCC and invokes this method properly. + * + * You should override this to create your own RowProcessor. + * + * If you are doing read-modify-write here, you should consider using + * IsolationLevel.READ_UNCOMMITTED for scan because + * we advance MVCC after releasing the locks for optimization purpose. * * @param now the current system millisecond - * @param scanner the call back object the can be used to scan the row - * @param mutations the mutations for HRegion to do - * @param walEdit the wal edit here allows inject some other meta data + * @param region the HRegion + * @param mutations the output mutations to apply to memstore + * @param walEdit the output WAL edits to apply to write ahead log */ void process(long now, - RowProcessor.RowScanner scanner, + HRegion region, List mutations, WALEdit walEdit) throws IOException; /** - * The call back provided by HRegion to perform the scans on the row + * The hook to be executed before process(). + * + * @param region the HRegion + * @param walEdit the output WAL edits to apply to write ahead log */ - public interface RowScanner { - /** - * @param scan The object defines what to read - * @param result The scan results will be added here - */ - void doScan(Scan scan, List result) throws IOException; - } + void preProcess(HRegion region, WALEdit walEdit) throws IOException; + + /** + * The hook to be executed after process(). + * + * @param region the HRegion + * @param walEdit the output WAL edits to apply to write ahead log + */ + void postProcess(HRegion region, WALEdit walEdit) throws IOException; + /** * @return The replication cluster id. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java.orig b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java.orig new file mode 100644 index 00000000000..2a0284a6cf7 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java.orig @@ -0,0 +1,564 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +/** + * This worker is spawned in every regionserver (should we also spawn one in + * the master?). The Worker waits for log splitting tasks to be put up by the + * {@link SplitLogManager} running in the master and races with other workers + * in other serves to acquire those tasks. The coordination is done via + * zookeeper. All the action takes place at /hbase/splitlog znode. + *

+ * If a worker has successfully moved the task from state UNASSIGNED to + * OWNED then it owns the task. It keeps heart beating the manager by + * periodically moving the task from UNASSIGNED to OWNED state. On success it + * moves the task to TASK_DONE. On unrecoverable error it moves task state to + * ERR. If it cannot continue but wants the master to retry the task then it + * moves the task state to RESIGNED. + *

+ * The manager can take a task away from a worker by moving the task from + * OWNED to UNASSIGNED. In the absence of a global lock there is a + * unavoidable race here - a worker might have just finished its task when it + * is stripped of its ownership. Here we rely on the idempotency of the log + * splitting task for correctness + */ +@InterfaceAudience.Private +public class SplitLogWorker extends ZooKeeperListener implements Runnable { + private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); + + Thread worker; + private final String serverName; + private final TaskExecutor splitTaskExecutor; + private long zkretries; + + private Object taskReadyLock = new Object(); + volatile int taskReadySeq = 0; + private volatile String currentTask = null; + private int currentVersion; + private volatile boolean exitWorker; + private Object grabTaskLock = new Object(); + private boolean workerInGrabTask = false; + + + public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, + String serverName, TaskExecutor splitTaskExecutor) { + super(watcher); + this.serverName = serverName; + this.splitTaskExecutor = splitTaskExecutor; + this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3); + } + + public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, + final String serverName) { + this(watcher, conf, serverName, new TaskExecutor () { + @Override + public Status exec(String filename, CancelableProgressable p) { + Path rootdir; + FileSystem fs; + try { + rootdir = FSUtils.getRootDir(conf); + fs = rootdir.getFileSystem(conf); + } catch (IOException e) { + LOG.warn("could not find root dir or fs", e); + return Status.RESIGNED; + } + // TODO have to correctly figure out when log splitting has been + // interrupted or has encountered a transient error and when it has + // encountered a bad non-retry-able persistent error. + try { + String tmpname = + ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename); + if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, + fs.getFileStatus(new Path(filename)), fs, conf, p) == false) { + return Status.PREEMPTED; + } + } catch (InterruptedIOException iioe) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", + iioe); + return Status.RESIGNED; + } catch (IOException e) { + Throwable cause = e.getCause(); + if (cause instanceof InterruptedException) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", + e); + return Status.RESIGNED; + } + LOG.warn("log splitting of " + filename + " failed, returning error", + e); + return Status.ERR; + } + return Status.DONE; + } + }); + } + + @Override + public void run() { + try { + LOG.info("SplitLogWorker " + this.serverName + " starting"); + this.watcher.registerListener(this); + int res; + // wait for master to create the splitLogZnode + res = -1; + while (res == -1) { + try { + res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); + } catch (KeeperException e) { + // ignore + LOG.warn("Exception when checking for " + watcher.splitLogZNode + + " ... retrying", e); + } + if (res == -1) { + try { + LOG.info(watcher.splitLogZNode + " znode does not exist," + + " waiting for master to create one"); + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode); + assert exitWorker == true; + } + } + } + + taskLoop(); + } catch (Throwable t) { + // only a logical error can cause here. Printing it out + // to make debugging easier + LOG.error("unexpected error ", t); + } finally { + LOG.info("SplitLogWorker " + this.serverName + " exiting"); + } + } + + /** + * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task + * one at a time. This policy puts an upper-limit on the number of + * simultaneous log splitting that could be happening in a cluster. + *

+ * Synchronization using {@link #task_ready_signal_seq} ensures that it will + * try to grab every task that has been put up + */ + private void taskLoop() { + while (true) { + int seq_start = taskReadySeq; + List paths = getTaskList(); + if (paths == null) { + LOG.warn("Could not get tasks, did someone remove " + + this.watcher.splitLogZNode + " ... worker thread exiting."); + return; + } + int offset = (int)(Math.random() * paths.size()); + for (int i = 0; i < paths.size(); i ++) { + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + if (exitWorker == true) { + return; + } + } + synchronized (taskReadyLock) { + while (seq_start == taskReadySeq) { + try { + taskReadyLock.wait(); + } catch (InterruptedException e) { + LOG.info("SplitLogWorker interrupted while waiting for task," + + " exiting: " + e.toString()); + assert exitWorker == true; + return; + } + } + } + } + } + + /** + * try to grab a 'lock' on the task zk node to own and execute the task. + *

+ * @param path zk node for the task + */ + private void grabTask(String path) { + Stat stat = new Stat(); + long t = -1; + byte[] data; + synchronized (grabTaskLock) { + currentTask = path; + workerInGrabTask = true; + if (Thread.interrupted()) { + return; + } + } + try { + try { + if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) { + tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); + return; + } + } catch (KeeperException e) { + LOG.warn("Failed to get data for znode " + path, e); + tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + return; + } + if (TaskState.TASK_UNASSIGNED.equals(data) == false) { + tot_wkr_failed_to_grab_task_owned.incrementAndGet(); + return; + } + + currentVersion = stat.getVersion(); + if (attemptToOwnTask(true) == false) { + tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); + return; + } + + if (ZKSplitLog.isRescanNode(watcher, currentTask)) { + endTask(TaskState.TASK_DONE, tot_wkr_task_acquired_rescan); + return; + } + LOG.info("worker " + serverName + " acquired task " + path); + tot_wkr_task_acquired.incrementAndGet(); + getDataSetWatchAsync(); + + t = System.currentTimeMillis(); + TaskExecutor.Status status; + + status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask), + new CancelableProgressable() { + + @Override + public boolean progress() { + if (attemptToOwnTask(false) == false) { + LOG.warn("Failed to heartbeat the task" + currentTask); + return false; + } + return true; + } + }); + switch (status) { + case DONE: + endTask(TaskState.TASK_DONE, tot_wkr_task_done); + break; + case PREEMPTED: + tot_wkr_preempt_task.incrementAndGet(); + LOG.warn("task execution prempted " + path); + break; + case ERR: + if (!exitWorker) { + endTask(TaskState.TASK_ERR, tot_wkr_task_err); + break; + } + // if the RS is exiting then there is probably a tons of stuff + // that can go wrong. Resign instead of signaling error. + //$FALL-THROUGH$ + case RESIGNED: + if (exitWorker) { + LOG.info("task execution interrupted because worker is exiting " + + path); + endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned); + } else { + tot_wkr_preempt_task.incrementAndGet(); + LOG.info("task execution interrupted via zk by manager " + + path); + } + break; + } + } finally { + if (t > 0) { + LOG.info("worker " + serverName + " done with task " + path + + " in " + (System.currentTimeMillis() - t) + "ms"); + } + synchronized (grabTaskLock) { + workerInGrabTask = false; + // clear the interrupt from stopTask() otherwise the next task will + // suffer + Thread.interrupted(); + } + } + return; + } + + /** + * Try to own the task by transitioning the zk node data from UNASSIGNED to + * OWNED. + *

+ * This method is also used to periodically heartbeat the task progress by + * transitioning the node from OWNED to OWNED. + *

+ * @return true if task path is successfully locked + */ + private boolean attemptToOwnTask(boolean isFirstTime) { + try { + Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask, + TaskState.TASK_OWNED.get(serverName), currentVersion); + if (stat == null) { + LOG.warn("zk.setData() returned null for path " + currentTask); + tot_wkr_task_heartbeat_failed.incrementAndGet(); + return (false); + } + currentVersion = stat.getVersion(); + tot_wkr_task_heartbeat.incrementAndGet(); + return (true); + } catch (KeeperException e) { + if (!isFirstTime) { + if (e.code().equals(KeeperException.Code.NONODE)) { + LOG.warn("NONODE failed to assert ownership for " + currentTask, e); + } else if (e.code().equals(KeeperException.Code.BADVERSION)) { + LOG.warn("BADVERSION failed to assert ownership for " + + currentTask, e); + } else { + LOG.warn("failed to assert ownership for " + currentTask, e); + } + } + } catch (InterruptedException e1) { + LOG.warn("Interrupted while trying to assert ownership of " + + currentTask + " " + StringUtils.stringifyException(e1)); + Thread.currentThread().interrupt(); + } + tot_wkr_task_heartbeat_failed.incrementAndGet(); + return (false); + } + + /** + * endTask() can fail and the only way to recover out of it is for the + * {@link SplitLogManager} to timeout the task node. + * @param ts + * @param ctr + */ + private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) { + String path = currentTask; + currentTask = null; + try { + if (ZKUtil.setData(this.watcher, path, ts.get(serverName), + currentVersion)) { + LOG.info("successfully transitioned task " + path + + " to final state " + ts); + ctr.incrementAndGet(); + return; + } + LOG.warn("failed to transistion task " + path + " to end state " + ts + + " because of version mismatch "); + } catch (KeeperException.BadVersionException bve) { + LOG.warn("transisition task " + path + " to " + ts + + " failed because of version mismatch", bve); + } catch (KeeperException.NoNodeException e) { + LOG.fatal("logic error - end task " + path + " " + ts + + " failed because task doesn't exist", e); + } catch (KeeperException e) { + LOG.warn("failed to end task, " + path + " " + ts, e); + } + tot_wkr_final_transistion_failed.incrementAndGet(); + return; + } + + void getDataSetWatchAsync() { + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + getData(currentTask, this.watcher, + new GetDataAsyncCallback(), null); + tot_wkr_get_data_queued.incrementAndGet(); + } + + void getDataSetWatchSuccess(String path, byte[] data) { + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + // have to compare data. cannot compare version because then there + // will be race with attemptToOwnTask() + // cannot just check whether the node has been transitioned to + // UNASSIGNED because by the time this worker sets the data watch + // the node might have made two transitions - from owned by this + // worker to unassigned to owned by another worker + if (! TaskState.TASK_OWNED.equals(data, serverName) && + ! TaskState.TASK_DONE.equals(data, serverName) && + ! TaskState.TASK_ERR.equals(data, serverName) && + ! TaskState.TASK_RESIGNED.equals(data, serverName)) { + LOG.info("task " + taskpath + " preempted from " + + serverName + ", current task state and owner=" + + new String(data)); + stopTask(); + } + } + } + } + } + + void getDataSetWatchFailure(String path) { + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + LOG.info("retrying data watch on " + path); + tot_wkr_get_data_retry.incrementAndGet(); + getDataSetWatchAsync(); + } else { + // no point setting a watch on the task which this worker is not + // working upon anymore + } + } + } + } + + + + + @Override + public void nodeDataChanged(String path) { + // there will be a self generated dataChanged event every time attemptToOwnTask() + // heartbeats the task znode by upping its version + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change + String taskpath = currentTask; + if (taskpath!= null && taskpath.equals(path)) { + getDataSetWatchAsync(); + } + } + } + } + + + private List getTaskList() { + for (int i = 0; i < zkretries; i++) { + try { + return (ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, + this.watcher.splitLogZNode)); + } catch (KeeperException e) { + LOG.warn("Could not get children of znode " + + this.watcher.splitLogZNode, e); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + LOG.warn("Interrupted while trying to get task list ...", e1); + Thread.currentThread().interrupt(); + return null; + } + } + } + LOG.warn("Tried " + zkretries + " times, still couldn't fetch " + + "children of " + watcher.splitLogZNode + " giving up"); + return null; + } + + + @Override + public void nodeChildrenChanged(String path) { + if(path.equals(watcher.splitLogZNode)) { + LOG.debug("tasks arrived or departed"); + synchronized (taskReadyLock) { + taskReadySeq++; + taskReadyLock.notify(); + } + } + } + + /** + * If the worker is doing a task i.e. splitting a log file then stop the task. + * It doesn't exit the worker thread. + */ + void stopTask() { + LOG.info("Sending interrupt to stop the worker thread"); + worker.interrupt(); // TODO interrupt often gets swallowed, do what else? + } + + + /** + * start the SplitLogWorker thread + */ + public void start() { + worker = new Thread(null, this, "SplitLogWorker-" + serverName); + exitWorker = false; + worker.start(); + return; + } + + /** + * stop the SplitLogWorker thread + */ + public void stop() { + exitWorker = true; + stopTask(); + } + + /** + * Asynchronous handler for zk get-data-set-watch on node results. + */ + class GetDataAsyncCallback implements AsyncCallback.DataCallback { + private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, + Stat stat) { + tot_wkr_get_data_result.incrementAndGet(); + if (rc != 0) { + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); + getDataSetWatchFailure(path); + return; + } + data = watcher.getRecoverableZooKeeper().removeMetaData(data); + getDataSetWatchSuccess(path, data); + return; + } + } + + /** + * Objects implementing this interface actually do the task that has been + * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight + * guarantee that two workers will not be executing the same task therefore it + * is better to have workers prepare the task and then have the + * {@link SplitLogManager} commit the work in SplitLogManager.TaskFinisher + */ + static public interface TaskExecutor { + static public enum Status { + DONE(), + ERR(), + RESIGNED(), + PREEMPTED(); + } + public Status exec(String name, CancelableProgressable p); + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java deleted file mode 100644 index a4e495e5ecb..00000000000 --- a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.coprocessor; - -import static org.junit.Assert.assertEquals; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.sun.org.apache.commons.logging.Log; -import com.sun.org.apache.commons.logging.LogFactory; - -/** - * Verifies ProcessRowEndpoint works. - * The tested RowProcessor performs two scans and a read-modify-write. - */ -@Category(SmallTests.class) -public class TestProcessRowEndpoint { - - static final Log LOG = LogFactory.getLog(TestProcessRowEndpoint.class); - - private static final byte[] TABLE = Bytes.toBytes("testtable"); - private static final byte[] TABLE2 = Bytes.toBytes("testtable2"); - private final static byte[] ROW = Bytes.toBytes("testrow"); - private final static byte[] FAM = Bytes.toBytes("friendlist"); - - // Column names - private final static byte[] A = Bytes.toBytes("a"); - private final static byte[] B = Bytes.toBytes("b"); - private final static byte[] C = Bytes.toBytes("c"); - private final static byte[] D = Bytes.toBytes("d"); - private final static byte[] E = Bytes.toBytes("e"); - private final static byte[] F = Bytes.toBytes("f"); - private final static byte[] G = Bytes.toBytes("g"); - private final static byte[] REQUESTS = Bytes.toBytes("requests"); - - private static HBaseTestingUtility util = new HBaseTestingUtility(); - private volatile int numRequests; - - private CountDownLatch startSignal; - private CountDownLatch doneSignal; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - Configuration conf = util.getConfiguration(); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - FriendsOfFriendsEndpoint.class.getName()); - util.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - util.shutdownMiniCluster(); - } - - @Test - public void testSingle() throws Throwable { - HTable table = prepareTestData(TABLE, util); - verifyProcessRow(table); - assertEquals(1, numRequests); - } - - private void verifyProcessRow(HTable table) throws Throwable { - - FriendsOfFriendsProtocol processor = - table.coprocessorProxy(FriendsOfFriendsProtocol.class, ROW); - Result result = processor.query(ROW, A); - - Set friendsOfFriends = new HashSet(); - for (KeyValue kv : result.raw()) { - if (Bytes.equals(kv.getQualifier(), REQUESTS)) { - numRequests = Bytes.toInt(kv.getValue()); - continue; - } - for (byte val : kv.getValue()) { - friendsOfFriends.add((char)val + ""); - } - } - Set expected = - new HashSet(Arrays.asList(new String[]{"d", "e", "f", "g"})); - assertEquals(expected, friendsOfFriends); - } - - @Test - public void testThreads() throws Exception { - HTable table = prepareTestData(TABLE2, util); - int numThreads = 1000; - startSignal = new CountDownLatch(numThreads); - doneSignal = new CountDownLatch(numThreads); - for (int i = 0; i < numThreads; ++i) { - new Thread(new QueryRunner(table)).start(); - startSignal.countDown(); - } - doneSignal.await(); - Get get = new Get(ROW); - LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list())); - assertEquals(numThreads, numRequests); - } - - class QueryRunner implements Runnable { - final HTable table; - QueryRunner(final HTable table) { - this.table = table; - } - @Override - public void run() { - try { - startSignal.await(); - verifyProcessRow(table); - } catch (Throwable e) { - e.printStackTrace(); - } - doneSignal.countDown(); - } - } - - static HTable prepareTestData(byte[] tableName, HBaseTestingUtility util) - throws Exception { - HTable table = util.createTable(tableName, FAM); - Put put = new Put(ROW); - put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A - put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B - put.add(FAM, C, G); // G is a friend of C - table.put(put); - return table; - } - - /** - * Coprocessor protocol that finds friends of friends of a person and - * update the number of requests. - */ - public static interface FriendsOfFriendsProtocol extends CoprocessorProtocol { - - /** - * Query a person's friends of friends - */ - Result query(byte[] row, byte[] person) throws IOException; - } - - /** - * Finds friends of friends of a person and update the number of requests. - */ - public static class FriendsOfFriendsEndpoint extends BaseEndpointCoprocessor - implements FriendsOfFriendsProtocol, RowProcessor { - byte[] row = null; - byte[] person = null; - Result result = null; - - // - // FriendsOfFriendsProtocol method - // - - @Override - public Result query(byte[] row, byte[] person) throws IOException { - this.row = row; - this.person = person; - HRegion region = - ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); - region.processRow(this); - return this.getResult(); - } - - // - // RowProcessor methods - // - - FriendsOfFriendsEndpoint() { - } - - @Override - public byte[] getRow() { - return row; - } - - @Override - public Result getResult() { - return result; - } - - @Override - public boolean readOnly() { - return false; - } - - @Override - public void process(long now, RowProcessor.RowScanner scanner, - List mutations, WALEdit walEdit) throws IOException { - List kvs = new ArrayList(); - { // First scan to get friends of the person and numRequests - Scan scan = new Scan(row, row); - scan.addColumn(FAM, person); - scan.addColumn(FAM, REQUESTS); - scanner.doScan(scan, kvs); - } - LOG.debug("first scan:" + stringifyKvs(kvs)); - int numRequests = 0; - // Second scan to get friends of friends - Scan scan = new Scan(row, row); - for (KeyValue kv : kvs) { - if (Bytes.equals(kv.getQualifier(), REQUESTS)) { - numRequests = Bytes.toInt(kv.getValue()); - continue; - } - byte[] friends = kv.getValue(); - for (byte f : friends) { - scan.addColumn(FAM, new byte[]{f}); - } - } - scanner.doScan(scan, kvs); - - LOG.debug("second scan:" + stringifyKvs(kvs)); - numRequests += 1; - // Construct mutations and Result - KeyValue kv = new KeyValue( - row, FAM, REQUESTS, now, Bytes.toBytes(numRequests)); - mutations.clear(); - mutations.add(kv); - kvs.add(kv); - LOG.debug("final result:" + stringifyKvs(kvs) + - " mutations:" + stringifyKvs(mutations)); - result = new Result(kvs); - // Inject some meta data to the walEdit - KeyValue metaKv = new KeyValue( - getRow(), HLog.METAFAMILY, - Bytes.toBytes("FriendsOfFriends query"), - person); - walEdit.add(metaKv); - } - - @Override - public void readFields(DataInput in) throws IOException { - this.person = Bytes.readByteArray(in); - this.row = Bytes.readByteArray(in); - this.result = new Result(); - result.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, person); - Bytes.writeByteArray(out, row); - if (result == null) { - new Result().write(out); - } else { - result.write(out); - } - } - - @Override - public UUID getClusterId() { - return HConstants.DEFAULT_CLUSTER_ID; - } - } - - static String stringifyKvs(Collection kvs) { - StringBuilder out = new StringBuilder(); - out.append("["); - for (KeyValue kv : kvs) { - byte[] col = kv.getQualifier(); - byte[] val = kv.getValue(); - if (Bytes.equals(col, REQUESTS)) { - out.append(Bytes.toStringBinary(col) + ":" + - Bytes.toInt(val) + " "); - } else { - out.append(Bytes.toStringBinary(col) + ":" + - Bytes.toStringBinary(val) + " "); - } - } - out.append("]"); - return out.toString(); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java new file mode 100644 index 00000000000..1623f17ecaa --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -0,0 +1,598 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; + +/** + * Verifies ProcessRowEndpoint works. + * The tested RowProcessor performs two scans and a read-modify-write. + */ +@Category(SmallTests.class) +public class TestRowProcessorEndpoint { + + static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class); + + private static final byte[] TABLE = Bytes.toBytes("testtable"); + private final static byte[] ROW = Bytes.toBytes("testrow"); + private final static byte[] ROW2 = Bytes.toBytes("testrow2"); + private final static byte[] FAM = Bytes.toBytes("friendlist"); + + // Column names + private final static byte[] A = Bytes.toBytes("a"); + private final static byte[] B = Bytes.toBytes("b"); + private final static byte[] C = Bytes.toBytes("c"); + private final static byte[] D = Bytes.toBytes("d"); + private final static byte[] E = Bytes.toBytes("e"); + private final static byte[] F = Bytes.toBytes("f"); + private final static byte[] G = Bytes.toBytes("g"); + private final static byte[] COUNTER = Bytes.toBytes("counter"); + private final static AtomicLong myTimer = new AtomicLong(0); + private final AtomicInteger failures = new AtomicInteger(0); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static volatile int expectedCounter = 0; + private static int rowSize, row2Size; + + private volatile static HTable table = null; + private volatile static boolean swapped = false; + private volatile CountDownLatch startSignal; + private volatile CountDownLatch doneSignal; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = util.getConfiguration(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + RowProcessorEndpoint.class.getName()); + conf.setInt("hbase.client.retries.number", 1); + conf.setLong("hbase.hregion.row.processor.timeout", 1000L); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + public void prepareTestData() throws Exception { + try { + util.getHBaseAdmin().disableTable(TABLE); + util.getHBaseAdmin().deleteTable(TABLE); + } catch (Exception e) { + // ignore table not found + } + table = util.createTable(TABLE, FAM); + { + Put put = new Put(ROW); + put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A + put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B + put.add(FAM, C, G); // G is a friend of C + table.put(put); + rowSize = put.size(); + } + Put put = new Put(ROW2); + put.add(FAM, D, E); + put.add(FAM, F, G); + table.put(put); + row2Size = put.size(); + } + + @Test + public void testDoubleScan() throws Throwable { + prepareTestData(); + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.FriendsOfFriendsProcessor processor = + new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); + Set result = protocol.process(processor); + + Set expected = + new HashSet(Arrays.asList(new String[]{"d", "e", "f", "g"})); + Get get = new Get(ROW); + LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list())); + assertEquals(expected, result); + } + + @Test + public void testReadModifyWrite() throws Throwable { + prepareTestData(); + failures.set(0); + int numThreads = 1000; + concurrentExec(new IncrementRunner(), numThreads); + Get get = new Get(ROW); + LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list())); + int finalCounter = incrementCounter(table); + assertEquals(numThreads + 1, finalCounter); + assertEquals(0, failures.get()); + } + + class IncrementRunner implements Runnable { + @Override + public void run() { + try { + incrementCounter(table); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + private int incrementCounter(HTable table) throws Throwable { + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.IncrementCounterProcessor processor = + new RowProcessorEndpoint.IncrementCounterProcessor(ROW); + int counterValue = protocol.process(processor); + return counterValue; + } + + private void concurrentExec( + final Runnable task, final int numThreads) throws Throwable { + startSignal = new CountDownLatch(numThreads); + doneSignal = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; ++i) { + new Thread(new Runnable() { + @Override + public void run() { + try { + startSignal.countDown(); + startSignal.await(); + task.run(); + } catch (Throwable e) { + failures.incrementAndGet(); + e.printStackTrace(); + } + doneSignal.countDown(); + } + }).start(); + } + doneSignal.await(); + } + + @Test + public void testMultipleRows() throws Throwable { + prepareTestData(); + failures.set(0); + int numThreads = 1000; + concurrentExec(new SwapRowsRunner(), numThreads); + LOG.debug("row keyvalues:" + + stringifyKvs(table.get(new Get(ROW)).list())); + LOG.debug("row2 keyvalues:" + + stringifyKvs(table.get(new Get(ROW2)).list())); + assertEquals(rowSize, table.get(new Get(ROW)).list().size()); + assertEquals(row2Size, table.get(new Get(ROW2)).list().size()); + assertEquals(0, failures.get()); + } + + class SwapRowsRunner implements Runnable { + @Override + public void run() { + try { + swapRows(table); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + private void swapRows(HTable table) throws Throwable { + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.RowSwapProcessor processor = + new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); + protocol.process(processor); + } + + @Test + public void testTimeout() throws Throwable { + prepareTestData(); + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.TimeoutProcessor processor = + new RowProcessorEndpoint.TimeoutProcessor(ROW); + boolean exceptionCaught = false; + try { + protocol.process(processor); + } catch (Exception e) { + exceptionCaught = true; + } + assertTrue(exceptionCaught); + } + + /** + * This class defines two RowProcessors: + * IncrementCounterProcessor and FriendsOfFriendsProcessor. + * + * We define the RowProcessors as the inner class of the endpoint. + * So they can be loaded with the endpoint on the coprocessor. + */ + public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint + implements RowProcessorProtocol { + + public static class IncrementCounterProcessor extends + BaseRowProcessor implements Writable { + int counter = 0; + byte[] row = new byte[0]; + + /** + * Empty constructor for Writable + */ + IncrementCounterProcessor() { + } + + IncrementCounterProcessor(byte[] row) { + this.row = row; + } + + @Override + public Collection getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public Integer getResult() { + return counter; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + // Scan current counter + List kvs = new ArrayList(); + Scan scan = new Scan(row, row); + scan.addColumn(FAM, COUNTER); + doScan(region, scan, kvs); + counter = kvs.size() == 0 ? 0 : + Bytes.toInt(kvs.iterator().next().getValue()); + + // Assert counter value + assertEquals(expectedCounter, counter); + + // Increment counter and send it to both memstore and wal edit + counter += 1; + expectedCounter += 1; + + + KeyValue kv = + new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter)); + mutations.add(kv); + walEdit.add(kv); + + // We can also inject some meta data to the walEdit + KeyValue metaKv = new KeyValue( + row, HLog.METAFAMILY, + Bytes.toBytes("I just increment counter"), + Bytes.toBytes(counter)); + walEdit.add(metaKv); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.row = Bytes.readByteArray(in); + this.counter = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, row); + out.writeInt(counter); + } + + } + + public static class FriendsOfFriendsProcessor extends + BaseRowProcessor> implements Writable { + byte[] row = null; + byte[] person = null; + final Set result = new HashSet(); + + /** + * Empty constructor for Writable + */ + FriendsOfFriendsProcessor() { + } + + FriendsOfFriendsProcessor(byte[] row, byte[] person) { + this.row = row; + this.person = person; + } + + @Override + public Collection getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public Set getResult() { + return result; + } + + @Override + public boolean readOnly() { + return true; + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + List kvs = new ArrayList(); + { // First scan to get friends of the person + Scan scan = new Scan(row, row); + scan.addColumn(FAM, person); + doScan(region, scan, kvs); + } + + // Second scan to get friends of friends + Scan scan = new Scan(row, row); + for (KeyValue kv : kvs) { + byte[] friends = kv.getValue(); + for (byte f : friends) { + scan.addColumn(FAM, new byte[]{f}); + } + } + doScan(region, scan, kvs); + + // Collect result + result.clear(); + for (KeyValue kv : kvs) { + for (byte b : kv.getValue()) { + result.add((char)b + ""); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.person = Bytes.readByteArray(in); + this.row = Bytes.readByteArray(in); + int size = in.readInt(); + result.clear(); + for (int i = 0; i < size; ++i) { + result.add(Text.readString(in)); + } + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, person); + Bytes.writeByteArray(out, row); + out.writeInt(result.size()); + for (String s : result) { + Text.writeString(out, s); + } + } + } + + public static class RowSwapProcessor extends + BaseRowProcessor> implements Writable { + byte[] row1 = new byte[0]; + byte[] row2 = new byte[0]; + + /** + * Empty constructor for Writable + */ + RowSwapProcessor() { + } + + RowSwapProcessor(byte[] row1, byte[] row2) { + this.row1 = row1; + this.row2 = row2; + } + + @Override + public Collection getRowsToLock() { + List rows = new ArrayList(); + rows.add(row1); + rows.add(row2); + return rows; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + + // Override the time to avoid race-condition in the unit test caused by + // inacurate timer on some machines + now = myTimer.getAndIncrement(); + + // Scan both rows + List kvs1 = new ArrayList(); + List kvs2 = new ArrayList(); + doScan(region, new Scan(row1, row1), kvs1); + doScan(region, new Scan(row2, row2), kvs2); + + // Assert swapped + if (swapped) { + assertEquals(rowSize, kvs2.size()); + assertEquals(row2Size, kvs1.size()); + } else { + assertEquals(rowSize, kvs1.size()); + assertEquals(row2Size, kvs2.size()); + } + swapped = !swapped; + + // Add and delete keyvalues + List> kvs = new ArrayList>(); + kvs.add(kvs1); + kvs.add(kvs2); + byte[][] rows = new byte[][]{row1, row2}; + for (int i = 0; i < kvs.size(); ++i) { + for (KeyValue kv : kvs.get(i)) { + // Delete from the current row and add to the other row + KeyValue kvDelete = + new KeyValue(rows[i], kv.getFamily(), kv.getQualifier(), + kv.getTimestamp(), KeyValue.Type.Delete); + KeyValue kvAdd = + new KeyValue(rows[1 - i], kv.getFamily(), kv.getQualifier(), + now, kv.getValue()); + mutations.add(kvDelete); + walEdit.add(kvDelete); + mutations.add(kvAdd); + walEdit.add(kvAdd); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.row1 = Bytes.readByteArray(in); + this.row2 = Bytes.readByteArray(in); + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, row1); + Bytes.writeByteArray(out, row2); + } + } + + public static class TimeoutProcessor extends + BaseRowProcessor implements Writable { + + byte[] row = new byte[0]; + + /** + * Empty constructor for Writable + */ + public TimeoutProcessor() { + } + + public TimeoutProcessor(byte[] row) { + this.row = row; + } + + public Collection getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + try { + // Sleep for a long time so it timeout + Thread.sleep(100 * 1000L); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public boolean readOnly() { + return true; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.row = Bytes.readByteArray(in); + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, row); + } + } + + public static void doScan( + HRegion region, Scan scan, List result) throws IOException { + InternalScanner scanner = null; + try { + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + scanner = region.getScanner(scan); + result.clear(); + scanner.next(result); + } finally { + if (scanner != null) scanner.close(); + } + } + } + + static String stringifyKvs(Collection kvs) { + StringBuilder out = new StringBuilder(); + out.append("["); + if (kvs != null) { + for (KeyValue kv : kvs) { + byte[] col = kv.getQualifier(); + byte[] val = kv.getValue(); + if (Bytes.equals(col, COUNTER)) { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toInt(val) + " "); + } else { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toStringBinary(val) + " "); + } + } + } + out.append("]"); + return out.toString(); + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +}