HBASE-5542 Unify HRegion.mutateRowsWithLocks() and HRegion.processRow() (Scott Chen) part 2
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303920 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6fb055da00
commit
536ff21825
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RowProcessor;
|
||||
|
||||
/**
|
||||
* This class demonstrates how to implement atomic read-modify-writes
|
||||
* using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor
|
||||
implements RowProcessorProtocol {
|
||||
|
||||
/**
|
||||
* Pass a processor to HRegion to process multiple rows atomically.
|
||||
*
|
||||
* The RowProcessor implementations should be the inner classes of your
|
||||
* RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
|
||||
* the Coprocessor endpoint together.
|
||||
*
|
||||
* See {@link TestRowProcessorEndpoint} for example.
|
||||
*
|
||||
* @param processor The object defines the read-modify-write procedure
|
||||
* @return The processing result
|
||||
*/
|
||||
@Override
|
||||
public <T> T process(RowProcessor<T> processor)
|
||||
throws IOException {
|
||||
HRegion region =
|
||||
((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
|
||||
region.processRowsWithLocks(processor);
|
||||
return processor.getResult();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RowProcessor;
|
||||
|
||||
/**
|
||||
* Defines a protocol to perform multi row transactions.
|
||||
* See {@link BaseRowProcessorEndpoint} for the implementation.
|
||||
* See {@link HRegion#processRowsWithLocks()} for detials.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public interface RowProcessorProtocol extends CoprocessorProtocol {
|
||||
|
||||
/**
|
||||
* @param processor The processor defines how to process the row
|
||||
*/
|
||||
<T> T process(RowProcessor<T> processor) throws IOException;
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
|
||||
/**
|
||||
* Base class for RowProcessor with some default implementations.
|
||||
*/
|
||||
public abstract class BaseRowProcessor<T> implements RowProcessor<T> {
|
||||
|
||||
@Override
|
||||
public T getResult() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public UUID getClusterId() {
|
||||
return HConstants.DEFAULT_CLUSTER_ID;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
|
||||
*/
|
||||
class MultiRowMutationProcessor extends BaseRowProcessor<Void> {
|
||||
Collection<byte[]> rowsToLock;
|
||||
Collection<Mutation> mutations;
|
||||
|
||||
MultiRowMutationProcessor(Collection<Mutation> mutations,
|
||||
Collection<byte[]> rowsToLock) {
|
||||
this.rowsToLock = rowsToLock;
|
||||
this.mutations = mutations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<byte[]> getRowsToLock() {
|
||||
return rowsToLock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(long now,
|
||||
HRegion region,
|
||||
List<KeyValue> mutationKvs,
|
||||
WALEdit walEdit) throws IOException {
|
||||
byte[] byteNow = Bytes.toBytes(now);
|
||||
// Check mutations and apply edits to a single WALEdit
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
|
||||
region.checkFamilies(familyMap.keySet());
|
||||
region.checkTimestamps(familyMap, now);
|
||||
region.updateKVTimestamps(familyMap.values(), byteNow);
|
||||
} else if (m instanceof Delete) {
|
||||
Delete d = (Delete) m;
|
||||
region.prepareDelete(d);
|
||||
region.prepareDeleteTimestamps(d, byteNow);
|
||||
} else {
|
||||
throw new DoNotRetryIOException(
|
||||
"Action must be Put or Delete. But was: "
|
||||
+ m.getClass().getName());
|
||||
}
|
||||
for (List<KeyValue> edits : m.getFamilyMap().values()) {
|
||||
boolean writeToWAL = m.getWriteToWAL();
|
||||
for (KeyValue kv : edits) {
|
||||
mutationKvs.add(kv);
|
||||
if (writeToWAL) {
|
||||
walEdit.add(kv);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
|
||||
// by pass everything
|
||||
return;
|
||||
}
|
||||
} else if (m instanceof Delete) {
|
||||
Delete d = (Delete) m;
|
||||
region.prepareDelete(d);
|
||||
if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
|
||||
// by pass everything
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
|
||||
} else if (m instanceof Delete) {
|
||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -15,30 +15,40 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
|
||||
/**
|
||||
* Defines the procedure to atomically perform multiple scans and mutations
|
||||
* on one single row. The generic type parameter T is the return type of
|
||||
* on a HRegion.
|
||||
*
|
||||
* This is invoked by {@link HRegion#processRowsWithLocks()}.
|
||||
* This class performs scans and generates mutations and WAL edits.
|
||||
* The locks and MVCC will be handled by HRegion.
|
||||
*
|
||||
* The generic type parameter T is the return type of
|
||||
* RowProcessor.getResult().
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public interface RowProcessor<T> extends Writable {
|
||||
public interface RowProcessor<T> {
|
||||
|
||||
/**
|
||||
* Which row to perform the read-write
|
||||
* Rows to lock while operation.
|
||||
* They have to be sorted with <code>RowProcessor</code>
|
||||
* to avoid deadlock.
|
||||
*/
|
||||
byte[] getRow();
|
||||
Collection<byte[]> getRowsToLock();
|
||||
|
||||
/**
|
||||
* Obtain the processing result
|
||||
|
@ -53,29 +63,40 @@ public interface RowProcessor<T> extends Writable {
|
|||
boolean readOnly();
|
||||
|
||||
/**
|
||||
* HRegion calls this to process a row. You should override this to create
|
||||
* your own RowProcessor.
|
||||
* HRegion handles the locks and MVCC and invokes this method properly.
|
||||
*
|
||||
* You should override this to create your own RowProcessor.
|
||||
*
|
||||
* If you are doing read-modify-write here, you should consider using
|
||||
* <code>IsolationLevel.READ_UNCOMMITTED</code> for scan because
|
||||
* we advance MVCC after releasing the locks for optimization purpose.
|
||||
*
|
||||
* @param now the current system millisecond
|
||||
* @param scanner the call back object the can be used to scan the row
|
||||
* @param mutations the mutations for HRegion to do
|
||||
* @param walEdit the wal edit here allows inject some other meta data
|
||||
* @param region the HRegion
|
||||
* @param mutations the output mutations to apply to memstore
|
||||
* @param walEdit the output WAL edits to apply to write ahead log
|
||||
*/
|
||||
void process(long now,
|
||||
RowProcessor.RowScanner scanner,
|
||||
HRegion region,
|
||||
List<KeyValue> mutations,
|
||||
WALEdit walEdit) throws IOException;
|
||||
|
||||
/**
|
||||
* The call back provided by HRegion to perform the scans on the row
|
||||
* The hook to be executed before process().
|
||||
*
|
||||
* @param region the HRegion
|
||||
* @param walEdit the output WAL edits to apply to write ahead log
|
||||
*/
|
||||
public interface RowScanner {
|
||||
/**
|
||||
* @param scan The object defines what to read
|
||||
* @param result The scan results will be added here
|
||||
*/
|
||||
void doScan(Scan scan, List<KeyValue> result) throws IOException;
|
||||
}
|
||||
void preProcess(HRegion region, WALEdit walEdit) throws IOException;
|
||||
|
||||
/**
|
||||
* The hook to be executed after process().
|
||||
*
|
||||
* @param region the HRegion
|
||||
* @param walEdit the output WAL edits to apply to write ahead log
|
||||
*/
|
||||
void postProcess(HRegion region, WALEdit walEdit) throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* @return The replication cluster id.
|
|
@ -0,0 +1,564 @@
|
|||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.zookeeper.AsyncCallback;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* This worker is spawned in every regionserver (should we also spawn one in
|
||||
* the master?). The Worker waits for log splitting tasks to be put up by the
|
||||
* {@link SplitLogManager} running in the master and races with other workers
|
||||
* in other serves to acquire those tasks. The coordination is done via
|
||||
* zookeeper. All the action takes place at /hbase/splitlog znode.
|
||||
* <p>
|
||||
* If a worker has successfully moved the task from state UNASSIGNED to
|
||||
* OWNED then it owns the task. It keeps heart beating the manager by
|
||||
* periodically moving the task from UNASSIGNED to OWNED state. On success it
|
||||
* moves the task to TASK_DONE. On unrecoverable error it moves task state to
|
||||
* ERR. If it cannot continue but wants the master to retry the task then it
|
||||
* moves the task state to RESIGNED.
|
||||
* <p>
|
||||
* The manager can take a task away from a worker by moving the task from
|
||||
* OWNED to UNASSIGNED. In the absence of a global lock there is a
|
||||
* unavoidable race here - a worker might have just finished its task when it
|
||||
* is stripped of its ownership. Here we rely on the idempotency of the log
|
||||
* splitting task for correctness
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
|
||||
|
||||
Thread worker;
|
||||
private final String serverName;
|
||||
private final TaskExecutor splitTaskExecutor;
|
||||
private long zkretries;
|
||||
|
||||
private Object taskReadyLock = new Object();
|
||||
volatile int taskReadySeq = 0;
|
||||
private volatile String currentTask = null;
|
||||
private int currentVersion;
|
||||
private volatile boolean exitWorker;
|
||||
private Object grabTaskLock = new Object();
|
||||
private boolean workerInGrabTask = false;
|
||||
|
||||
|
||||
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
|
||||
String serverName, TaskExecutor splitTaskExecutor) {
|
||||
super(watcher);
|
||||
this.serverName = serverName;
|
||||
this.splitTaskExecutor = splitTaskExecutor;
|
||||
this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3);
|
||||
}
|
||||
|
||||
public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
|
||||
final String serverName) {
|
||||
this(watcher, conf, serverName, new TaskExecutor () {
|
||||
@Override
|
||||
public Status exec(String filename, CancelableProgressable p) {
|
||||
Path rootdir;
|
||||
FileSystem fs;
|
||||
try {
|
||||
rootdir = FSUtils.getRootDir(conf);
|
||||
fs = rootdir.getFileSystem(conf);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("could not find root dir or fs", e);
|
||||
return Status.RESIGNED;
|
||||
}
|
||||
// TODO have to correctly figure out when log splitting has been
|
||||
// interrupted or has encountered a transient error and when it has
|
||||
// encountered a bad non-retry-able persistent error.
|
||||
try {
|
||||
String tmpname =
|
||||
ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
|
||||
if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
|
||||
fs.getFileStatus(new Path(filename)), fs, conf, p) == false) {
|
||||
return Status.PREEMPTED;
|
||||
}
|
||||
} catch (InterruptedIOException iioe) {
|
||||
LOG.warn("log splitting of " + filename + " interrupted, resigning",
|
||||
iioe);
|
||||
return Status.RESIGNED;
|
||||
} catch (IOException e) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof InterruptedException) {
|
||||
LOG.warn("log splitting of " + filename + " interrupted, resigning",
|
||||
e);
|
||||
return Status.RESIGNED;
|
||||
}
|
||||
LOG.warn("log splitting of " + filename + " failed, returning error",
|
||||
e);
|
||||
return Status.ERR;
|
||||
}
|
||||
return Status.DONE;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("SplitLogWorker " + this.serverName + " starting");
|
||||
this.watcher.registerListener(this);
|
||||
int res;
|
||||
// wait for master to create the splitLogZnode
|
||||
res = -1;
|
||||
while (res == -1) {
|
||||
try {
|
||||
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
|
||||
} catch (KeeperException e) {
|
||||
// ignore
|
||||
LOG.warn("Exception when checking for " + watcher.splitLogZNode +
|
||||
" ... retrying", e);
|
||||
}
|
||||
if (res == -1) {
|
||||
try {
|
||||
LOG.info(watcher.splitLogZNode + " znode does not exist," +
|
||||
" waiting for master to create one");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode);
|
||||
assert exitWorker == true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taskLoop();
|
||||
} catch (Throwable t) {
|
||||
// only a logical error can cause here. Printing it out
|
||||
// to make debugging easier
|
||||
LOG.error("unexpected error ", t);
|
||||
} finally {
|
||||
LOG.info("SplitLogWorker " + this.serverName + " exiting");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
|
||||
* one at a time. This policy puts an upper-limit on the number of
|
||||
* simultaneous log splitting that could be happening in a cluster.
|
||||
* <p>
|
||||
* Synchronization using {@link #task_ready_signal_seq} ensures that it will
|
||||
* try to grab every task that has been put up
|
||||
*/
|
||||
private void taskLoop() {
|
||||
while (true) {
|
||||
int seq_start = taskReadySeq;
|
||||
List<String> paths = getTaskList();
|
||||
if (paths == null) {
|
||||
LOG.warn("Could not get tasks, did someone remove " +
|
||||
this.watcher.splitLogZNode + " ... worker thread exiting.");
|
||||
return;
|
||||
}
|
||||
int offset = (int)(Math.random() * paths.size());
|
||||
for (int i = 0; i < paths.size(); i ++) {
|
||||
int idx = (i + offset) % paths.size();
|
||||
// don't call ZKSplitLog.getNodeName() because that will lead to
|
||||
// double encoding of the path name
|
||||
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
|
||||
if (exitWorker == true) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
synchronized (taskReadyLock) {
|
||||
while (seq_start == taskReadySeq) {
|
||||
try {
|
||||
taskReadyLock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("SplitLogWorker interrupted while waiting for task," +
|
||||
" exiting: " + e.toString());
|
||||
assert exitWorker == true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* try to grab a 'lock' on the task zk node to own and execute the task.
|
||||
* <p>
|
||||
* @param path zk node for the task
|
||||
*/
|
||||
private void grabTask(String path) {
|
||||
Stat stat = new Stat();
|
||||
long t = -1;
|
||||
byte[] data;
|
||||
synchronized (grabTaskLock) {
|
||||
currentTask = path;
|
||||
workerInGrabTask = true;
|
||||
if (Thread.interrupted()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
try {
|
||||
if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
|
||||
tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to get data for znode " + path, e);
|
||||
tot_wkr_failed_to_grab_task_exception.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
if (TaskState.TASK_UNASSIGNED.equals(data) == false) {
|
||||
tot_wkr_failed_to_grab_task_owned.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
|
||||
currentVersion = stat.getVersion();
|
||||
if (attemptToOwnTask(true) == false) {
|
||||
tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
|
||||
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
|
||||
endTask(TaskState.TASK_DONE, tot_wkr_task_acquired_rescan);
|
||||
return;
|
||||
}
|
||||
LOG.info("worker " + serverName + " acquired task " + path);
|
||||
tot_wkr_task_acquired.incrementAndGet();
|
||||
getDataSetWatchAsync();
|
||||
|
||||
t = System.currentTimeMillis();
|
||||
TaskExecutor.Status status;
|
||||
|
||||
status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
|
||||
new CancelableProgressable() {
|
||||
|
||||
@Override
|
||||
public boolean progress() {
|
||||
if (attemptToOwnTask(false) == false) {
|
||||
LOG.warn("Failed to heartbeat the task" + currentTask);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
switch (status) {
|
||||
case DONE:
|
||||
endTask(TaskState.TASK_DONE, tot_wkr_task_done);
|
||||
break;
|
||||
case PREEMPTED:
|
||||
tot_wkr_preempt_task.incrementAndGet();
|
||||
LOG.warn("task execution prempted " + path);
|
||||
break;
|
||||
case ERR:
|
||||
if (!exitWorker) {
|
||||
endTask(TaskState.TASK_ERR, tot_wkr_task_err);
|
||||
break;
|
||||
}
|
||||
// if the RS is exiting then there is probably a tons of stuff
|
||||
// that can go wrong. Resign instead of signaling error.
|
||||
//$FALL-THROUGH$
|
||||
case RESIGNED:
|
||||
if (exitWorker) {
|
||||
LOG.info("task execution interrupted because worker is exiting " +
|
||||
path);
|
||||
endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned);
|
||||
} else {
|
||||
tot_wkr_preempt_task.incrementAndGet();
|
||||
LOG.info("task execution interrupted via zk by manager " +
|
||||
path);
|
||||
}
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
if (t > 0) {
|
||||
LOG.info("worker " + serverName + " done with task " + path +
|
||||
" in " + (System.currentTimeMillis() - t) + "ms");
|
||||
}
|
||||
synchronized (grabTaskLock) {
|
||||
workerInGrabTask = false;
|
||||
// clear the interrupt from stopTask() otherwise the next task will
|
||||
// suffer
|
||||
Thread.interrupted();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to own the task by transitioning the zk node data from UNASSIGNED to
|
||||
* OWNED.
|
||||
* <p>
|
||||
* This method is also used to periodically heartbeat the task progress by
|
||||
* transitioning the node from OWNED to OWNED.
|
||||
* <p>
|
||||
* @return true if task path is successfully locked
|
||||
*/
|
||||
private boolean attemptToOwnTask(boolean isFirstTime) {
|
||||
try {
|
||||
Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask,
|
||||
TaskState.TASK_OWNED.get(serverName), currentVersion);
|
||||
if (stat == null) {
|
||||
LOG.warn("zk.setData() returned null for path " + currentTask);
|
||||
tot_wkr_task_heartbeat_failed.incrementAndGet();
|
||||
return (false);
|
||||
}
|
||||
currentVersion = stat.getVersion();
|
||||
tot_wkr_task_heartbeat.incrementAndGet();
|
||||
return (true);
|
||||
} catch (KeeperException e) {
|
||||
if (!isFirstTime) {
|
||||
if (e.code().equals(KeeperException.Code.NONODE)) {
|
||||
LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
|
||||
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
|
||||
LOG.warn("BADVERSION failed to assert ownership for " +
|
||||
currentTask, e);
|
||||
} else {
|
||||
LOG.warn("failed to assert ownership for " + currentTask, e);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.warn("Interrupted while trying to assert ownership of " +
|
||||
currentTask + " " + StringUtils.stringifyException(e1));
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
tot_wkr_task_heartbeat_failed.incrementAndGet();
|
||||
return (false);
|
||||
}
|
||||
|
||||
/**
|
||||
* endTask() can fail and the only way to recover out of it is for the
|
||||
* {@link SplitLogManager} to timeout the task node.
|
||||
* @param ts
|
||||
* @param ctr
|
||||
*/
|
||||
private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) {
|
||||
String path = currentTask;
|
||||
currentTask = null;
|
||||
try {
|
||||
if (ZKUtil.setData(this.watcher, path, ts.get(serverName),
|
||||
currentVersion)) {
|
||||
LOG.info("successfully transitioned task " + path +
|
||||
" to final state " + ts);
|
||||
ctr.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
LOG.warn("failed to transistion task " + path + " to end state " + ts +
|
||||
" because of version mismatch ");
|
||||
} catch (KeeperException.BadVersionException bve) {
|
||||
LOG.warn("transisition task " + path + " to " + ts +
|
||||
" failed because of version mismatch", bve);
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.fatal("logic error - end task " + path + " " + ts +
|
||||
" failed because task doesn't exist", e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("failed to end task, " + path + " " + ts, e);
|
||||
}
|
||||
tot_wkr_final_transistion_failed.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
|
||||
void getDataSetWatchAsync() {
|
||||
this.watcher.getRecoverableZooKeeper().getZooKeeper().
|
||||
getData(currentTask, this.watcher,
|
||||
new GetDataAsyncCallback(), null);
|
||||
tot_wkr_get_data_queued.incrementAndGet();
|
||||
}
|
||||
|
||||
void getDataSetWatchSuccess(String path, byte[] data) {
|
||||
synchronized (grabTaskLock) {
|
||||
if (workerInGrabTask) {
|
||||
// currentTask can change but that's ok
|
||||
String taskpath = currentTask;
|
||||
if (taskpath != null && taskpath.equals(path)) {
|
||||
// have to compare data. cannot compare version because then there
|
||||
// will be race with attemptToOwnTask()
|
||||
// cannot just check whether the node has been transitioned to
|
||||
// UNASSIGNED because by the time this worker sets the data watch
|
||||
// the node might have made two transitions - from owned by this
|
||||
// worker to unassigned to owned by another worker
|
||||
if (! TaskState.TASK_OWNED.equals(data, serverName) &&
|
||||
! TaskState.TASK_DONE.equals(data, serverName) &&
|
||||
! TaskState.TASK_ERR.equals(data, serverName) &&
|
||||
! TaskState.TASK_RESIGNED.equals(data, serverName)) {
|
||||
LOG.info("task " + taskpath + " preempted from " +
|
||||
serverName + ", current task state and owner=" +
|
||||
new String(data));
|
||||
stopTask();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void getDataSetWatchFailure(String path) {
|
||||
synchronized (grabTaskLock) {
|
||||
if (workerInGrabTask) {
|
||||
// currentTask can change but that's ok
|
||||
String taskpath = currentTask;
|
||||
if (taskpath != null && taskpath.equals(path)) {
|
||||
LOG.info("retrying data watch on " + path);
|
||||
tot_wkr_get_data_retry.incrementAndGet();
|
||||
getDataSetWatchAsync();
|
||||
} else {
|
||||
// no point setting a watch on the task which this worker is not
|
||||
// working upon anymore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void nodeDataChanged(String path) {
|
||||
// there will be a self generated dataChanged event every time attemptToOwnTask()
|
||||
// heartbeats the task znode by upping its version
|
||||
synchronized (grabTaskLock) {
|
||||
if (workerInGrabTask) {
|
||||
// currentTask can change
|
||||
String taskpath = currentTask;
|
||||
if (taskpath!= null && taskpath.equals(path)) {
|
||||
getDataSetWatchAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private List<String> getTaskList() {
|
||||
for (int i = 0; i < zkretries; i++) {
|
||||
try {
|
||||
return (ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
|
||||
this.watcher.splitLogZNode));
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Could not get children of znode " +
|
||||
this.watcher.splitLogZNode, e);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.warn("Interrupted while trying to get task list ...", e1);
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.warn("Tried " + zkretries + " times, still couldn't fetch " +
|
||||
"children of " + watcher.splitLogZNode + " giving up");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void nodeChildrenChanged(String path) {
|
||||
if(path.equals(watcher.splitLogZNode)) {
|
||||
LOG.debug("tasks arrived or departed");
|
||||
synchronized (taskReadyLock) {
|
||||
taskReadySeq++;
|
||||
taskReadyLock.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the worker is doing a task i.e. splitting a log file then stop the task.
|
||||
* It doesn't exit the worker thread.
|
||||
*/
|
||||
void stopTask() {
|
||||
LOG.info("Sending interrupt to stop the worker thread");
|
||||
worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* start the SplitLogWorker thread
|
||||
*/
|
||||
public void start() {
|
||||
worker = new Thread(null, this, "SplitLogWorker-" + serverName);
|
||||
exitWorker = false;
|
||||
worker.start();
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* stop the SplitLogWorker thread
|
||||
*/
|
||||
public void stop() {
|
||||
exitWorker = true;
|
||||
stopTask();
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronous handler for zk get-data-set-watch on node results.
|
||||
*/
|
||||
class GetDataAsyncCallback implements AsyncCallback.DataCallback {
|
||||
private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
|
||||
|
||||
@Override
|
||||
public void processResult(int rc, String path, Object ctx, byte[] data,
|
||||
Stat stat) {
|
||||
tot_wkr_get_data_result.incrementAndGet();
|
||||
if (rc != 0) {
|
||||
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
|
||||
getDataSetWatchFailure(path);
|
||||
return;
|
||||
}
|
||||
data = watcher.getRecoverableZooKeeper().removeMetaData(data);
|
||||
getDataSetWatchSuccess(path, data);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Objects implementing this interface actually do the task that has been
|
||||
* acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
|
||||
* guarantee that two workers will not be executing the same task therefore it
|
||||
* is better to have workers prepare the task and then have the
|
||||
* {@link SplitLogManager} commit the work in SplitLogManager.TaskFinisher
|
||||
*/
|
||||
static public interface TaskExecutor {
|
||||
static public enum Status {
|
||||
DONE(),
|
||||
ERR(),
|
||||
RESIGNED(),
|
||||
PREEMPTED();
|
||||
}
|
||||
public Status exec(String name, CancelableProgressable p);
|
||||
}
|
||||
}
|
|
@ -1,321 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.sun.org.apache.commons.logging.Log;
|
||||
import com.sun.org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Verifies ProcessRowEndpoint works.
|
||||
* The tested RowProcessor performs two scans and a read-modify-write.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestProcessRowEndpoint {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(TestProcessRowEndpoint.class);
|
||||
|
||||
private static final byte[] TABLE = Bytes.toBytes("testtable");
|
||||
private static final byte[] TABLE2 = Bytes.toBytes("testtable2");
|
||||
private final static byte[] ROW = Bytes.toBytes("testrow");
|
||||
private final static byte[] FAM = Bytes.toBytes("friendlist");
|
||||
|
||||
// Column names
|
||||
private final static byte[] A = Bytes.toBytes("a");
|
||||
private final static byte[] B = Bytes.toBytes("b");
|
||||
private final static byte[] C = Bytes.toBytes("c");
|
||||
private final static byte[] D = Bytes.toBytes("d");
|
||||
private final static byte[] E = Bytes.toBytes("e");
|
||||
private final static byte[] F = Bytes.toBytes("f");
|
||||
private final static byte[] G = Bytes.toBytes("g");
|
||||
private final static byte[] REQUESTS = Bytes.toBytes("requests");
|
||||
|
||||
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||
private volatile int numRequests;
|
||||
|
||||
private CountDownLatch startSignal;
|
||||
private CountDownLatch doneSignal;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
FriendsOfFriendsEndpoint.class.getName());
|
||||
util.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
util.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingle() throws Throwable {
|
||||
HTable table = prepareTestData(TABLE, util);
|
||||
verifyProcessRow(table);
|
||||
assertEquals(1, numRequests);
|
||||
}
|
||||
|
||||
private void verifyProcessRow(HTable table) throws Throwable {
|
||||
|
||||
FriendsOfFriendsProtocol processor =
|
||||
table.coprocessorProxy(FriendsOfFriendsProtocol.class, ROW);
|
||||
Result result = processor.query(ROW, A);
|
||||
|
||||
Set<String> friendsOfFriends = new HashSet<String>();
|
||||
for (KeyValue kv : result.raw()) {
|
||||
if (Bytes.equals(kv.getQualifier(), REQUESTS)) {
|
||||
numRequests = Bytes.toInt(kv.getValue());
|
||||
continue;
|
||||
}
|
||||
for (byte val : kv.getValue()) {
|
||||
friendsOfFriends.add((char)val + "");
|
||||
}
|
||||
}
|
||||
Set<String> expected =
|
||||
new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
|
||||
assertEquals(expected, friendsOfFriends);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreads() throws Exception {
|
||||
HTable table = prepareTestData(TABLE2, util);
|
||||
int numThreads = 1000;
|
||||
startSignal = new CountDownLatch(numThreads);
|
||||
doneSignal = new CountDownLatch(numThreads);
|
||||
for (int i = 0; i < numThreads; ++i) {
|
||||
new Thread(new QueryRunner(table)).start();
|
||||
startSignal.countDown();
|
||||
}
|
||||
doneSignal.await();
|
||||
Get get = new Get(ROW);
|
||||
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list()));
|
||||
assertEquals(numThreads, numRequests);
|
||||
}
|
||||
|
||||
class QueryRunner implements Runnable {
|
||||
final HTable table;
|
||||
QueryRunner(final HTable table) {
|
||||
this.table = table;
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
startSignal.await();
|
||||
verifyProcessRow(table);
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
doneSignal.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
static HTable prepareTestData(byte[] tableName, HBaseTestingUtility util)
|
||||
throws Exception {
|
||||
HTable table = util.createTable(tableName, FAM);
|
||||
Put put = new Put(ROW);
|
||||
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
||||
put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
|
||||
put.add(FAM, C, G); // G is a friend of C
|
||||
table.put(put);
|
||||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Coprocessor protocol that finds friends of friends of a person and
|
||||
* update the number of requests.
|
||||
*/
|
||||
public static interface FriendsOfFriendsProtocol extends CoprocessorProtocol {
|
||||
|
||||
/**
|
||||
* Query a person's friends of friends
|
||||
*/
|
||||
Result query(byte[] row, byte[] person) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds friends of friends of a person and update the number of requests.
|
||||
*/
|
||||
public static class FriendsOfFriendsEndpoint extends BaseEndpointCoprocessor
|
||||
implements FriendsOfFriendsProtocol, RowProcessor<Result> {
|
||||
byte[] row = null;
|
||||
byte[] person = null;
|
||||
Result result = null;
|
||||
|
||||
//
|
||||
// FriendsOfFriendsProtocol method
|
||||
//
|
||||
|
||||
@Override
|
||||
public Result query(byte[] row, byte[] person) throws IOException {
|
||||
this.row = row;
|
||||
this.person = person;
|
||||
HRegion region =
|
||||
((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
|
||||
region.processRow(this);
|
||||
return this.getResult();
|
||||
}
|
||||
|
||||
//
|
||||
// RowProcessor methods
|
||||
//
|
||||
|
||||
FriendsOfFriendsEndpoint() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRow() {
|
||||
return row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(long now, RowProcessor.RowScanner scanner,
|
||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
{ // First scan to get friends of the person and numRequests
|
||||
Scan scan = new Scan(row, row);
|
||||
scan.addColumn(FAM, person);
|
||||
scan.addColumn(FAM, REQUESTS);
|
||||
scanner.doScan(scan, kvs);
|
||||
}
|
||||
LOG.debug("first scan:" + stringifyKvs(kvs));
|
||||
int numRequests = 0;
|
||||
// Second scan to get friends of friends
|
||||
Scan scan = new Scan(row, row);
|
||||
for (KeyValue kv : kvs) {
|
||||
if (Bytes.equals(kv.getQualifier(), REQUESTS)) {
|
||||
numRequests = Bytes.toInt(kv.getValue());
|
||||
continue;
|
||||
}
|
||||
byte[] friends = kv.getValue();
|
||||
for (byte f : friends) {
|
||||
scan.addColumn(FAM, new byte[]{f});
|
||||
}
|
||||
}
|
||||
scanner.doScan(scan, kvs);
|
||||
|
||||
LOG.debug("second scan:" + stringifyKvs(kvs));
|
||||
numRequests += 1;
|
||||
// Construct mutations and Result
|
||||
KeyValue kv = new KeyValue(
|
||||
row, FAM, REQUESTS, now, Bytes.toBytes(numRequests));
|
||||
mutations.clear();
|
||||
mutations.add(kv);
|
||||
kvs.add(kv);
|
||||
LOG.debug("final result:" + stringifyKvs(kvs) +
|
||||
" mutations:" + stringifyKvs(mutations));
|
||||
result = new Result(kvs);
|
||||
// Inject some meta data to the walEdit
|
||||
KeyValue metaKv = new KeyValue(
|
||||
getRow(), HLog.METAFAMILY,
|
||||
Bytes.toBytes("FriendsOfFriends query"),
|
||||
person);
|
||||
walEdit.add(metaKv);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.person = Bytes.readByteArray(in);
|
||||
this.row = Bytes.readByteArray(in);
|
||||
this.result = new Result();
|
||||
result.readFields(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, person);
|
||||
Bytes.writeByteArray(out, row);
|
||||
if (result == null) {
|
||||
new Result().write(out);
|
||||
} else {
|
||||
result.write(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UUID getClusterId() {
|
||||
return HConstants.DEFAULT_CLUSTER_ID;
|
||||
}
|
||||
}
|
||||
|
||||
static String stringifyKvs(Collection<KeyValue> kvs) {
|
||||
StringBuilder out = new StringBuilder();
|
||||
out.append("[");
|
||||
for (KeyValue kv : kvs) {
|
||||
byte[] col = kv.getQualifier();
|
||||
byte[] val = kv.getValue();
|
||||
if (Bytes.equals(col, REQUESTS)) {
|
||||
out.append(Bytes.toStringBinary(col) + ":" +
|
||||
Bytes.toInt(val) + " ");
|
||||
} else {
|
||||
out.append(Bytes.toStringBinary(col) + ":" +
|
||||
Bytes.toStringBinary(val) + " ");
|
||||
}
|
||||
}
|
||||
out.append("]");
|
||||
return out.toString();
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||
}
|
|
@ -0,0 +1,598 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.sun.org.apache.commons.logging.Log;
|
||||
import com.sun.org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Verifies ProcessRowEndpoint works.
|
||||
* The tested RowProcessor performs two scans and a read-modify-write.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestRowProcessorEndpoint {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class);
|
||||
|
||||
private static final byte[] TABLE = Bytes.toBytes("testtable");
|
||||
private final static byte[] ROW = Bytes.toBytes("testrow");
|
||||
private final static byte[] ROW2 = Bytes.toBytes("testrow2");
|
||||
private final static byte[] FAM = Bytes.toBytes("friendlist");
|
||||
|
||||
// Column names
|
||||
private final static byte[] A = Bytes.toBytes("a");
|
||||
private final static byte[] B = Bytes.toBytes("b");
|
||||
private final static byte[] C = Bytes.toBytes("c");
|
||||
private final static byte[] D = Bytes.toBytes("d");
|
||||
private final static byte[] E = Bytes.toBytes("e");
|
||||
private final static byte[] F = Bytes.toBytes("f");
|
||||
private final static byte[] G = Bytes.toBytes("g");
|
||||
private final static byte[] COUNTER = Bytes.toBytes("counter");
|
||||
private final static AtomicLong myTimer = new AtomicLong(0);
|
||||
private final AtomicInteger failures = new AtomicInteger(0);
|
||||
|
||||
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||
private static volatile int expectedCounter = 0;
|
||||
private static int rowSize, row2Size;
|
||||
|
||||
private volatile static HTable table = null;
|
||||
private volatile static boolean swapped = false;
|
||||
private volatile CountDownLatch startSignal;
|
||||
private volatile CountDownLatch doneSignal;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
RowProcessorEndpoint.class.getName());
|
||||
conf.setInt("hbase.client.retries.number", 1);
|
||||
conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
|
||||
util.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
util.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
public void prepareTestData() throws Exception {
|
||||
try {
|
||||
util.getHBaseAdmin().disableTable(TABLE);
|
||||
util.getHBaseAdmin().deleteTable(TABLE);
|
||||
} catch (Exception e) {
|
||||
// ignore table not found
|
||||
}
|
||||
table = util.createTable(TABLE, FAM);
|
||||
{
|
||||
Put put = new Put(ROW);
|
||||
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
||||
put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
|
||||
put.add(FAM, C, G); // G is a friend of C
|
||||
table.put(put);
|
||||
rowSize = put.size();
|
||||
}
|
||||
Put put = new Put(ROW2);
|
||||
put.add(FAM, D, E);
|
||||
put.add(FAM, F, G);
|
||||
table.put(put);
|
||||
row2Size = put.size();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoubleScan() throws Throwable {
|
||||
prepareTestData();
|
||||
RowProcessorProtocol protocol =
|
||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||
RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
|
||||
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
|
||||
Set<String> result = protocol.process(processor);
|
||||
|
||||
Set<String> expected =
|
||||
new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
|
||||
Get get = new Get(ROW);
|
||||
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list()));
|
||||
assertEquals(expected, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadModifyWrite() throws Throwable {
|
||||
prepareTestData();
|
||||
failures.set(0);
|
||||
int numThreads = 1000;
|
||||
concurrentExec(new IncrementRunner(), numThreads);
|
||||
Get get = new Get(ROW);
|
||||
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list()));
|
||||
int finalCounter = incrementCounter(table);
|
||||
assertEquals(numThreads + 1, finalCounter);
|
||||
assertEquals(0, failures.get());
|
||||
}
|
||||
|
||||
class IncrementRunner implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
incrementCounter(table);
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int incrementCounter(HTable table) throws Throwable {
|
||||
RowProcessorProtocol protocol =
|
||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||
RowProcessorEndpoint.IncrementCounterProcessor processor =
|
||||
new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
|
||||
int counterValue = protocol.process(processor);
|
||||
return counterValue;
|
||||
}
|
||||
|
||||
private void concurrentExec(
|
||||
final Runnable task, final int numThreads) throws Throwable {
|
||||
startSignal = new CountDownLatch(numThreads);
|
||||
doneSignal = new CountDownLatch(numThreads);
|
||||
for (int i = 0; i < numThreads; ++i) {
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
startSignal.countDown();
|
||||
startSignal.await();
|
||||
task.run();
|
||||
} catch (Throwable e) {
|
||||
failures.incrementAndGet();
|
||||
e.printStackTrace();
|
||||
}
|
||||
doneSignal.countDown();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
doneSignal.await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRows() throws Throwable {
|
||||
prepareTestData();
|
||||
failures.set(0);
|
||||
int numThreads = 1000;
|
||||
concurrentExec(new SwapRowsRunner(), numThreads);
|
||||
LOG.debug("row keyvalues:" +
|
||||
stringifyKvs(table.get(new Get(ROW)).list()));
|
||||
LOG.debug("row2 keyvalues:" +
|
||||
stringifyKvs(table.get(new Get(ROW2)).list()));
|
||||
assertEquals(rowSize, table.get(new Get(ROW)).list().size());
|
||||
assertEquals(row2Size, table.get(new Get(ROW2)).list().size());
|
||||
assertEquals(0, failures.get());
|
||||
}
|
||||
|
||||
class SwapRowsRunner implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
swapRows(table);
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void swapRows(HTable table) throws Throwable {
|
||||
RowProcessorProtocol protocol =
|
||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||
RowProcessorEndpoint.RowSwapProcessor processor =
|
||||
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
|
||||
protocol.process(processor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout() throws Throwable {
|
||||
prepareTestData();
|
||||
RowProcessorProtocol protocol =
|
||||
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||
RowProcessorEndpoint.TimeoutProcessor processor =
|
||||
new RowProcessorEndpoint.TimeoutProcessor(ROW);
|
||||
boolean exceptionCaught = false;
|
||||
try {
|
||||
protocol.process(processor);
|
||||
} catch (Exception e) {
|
||||
exceptionCaught = true;
|
||||
}
|
||||
assertTrue(exceptionCaught);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class defines two RowProcessors:
|
||||
* IncrementCounterProcessor and FriendsOfFriendsProcessor.
|
||||
*
|
||||
* We define the RowProcessors as the inner class of the endpoint.
|
||||
* So they can be loaded with the endpoint on the coprocessor.
|
||||
*/
|
||||
public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint
|
||||
implements RowProcessorProtocol {
|
||||
|
||||
public static class IncrementCounterProcessor extends
|
||||
BaseRowProcessor<Integer> implements Writable {
|
||||
int counter = 0;
|
||||
byte[] row = new byte[0];
|
||||
|
||||
/**
|
||||
* Empty constructor for Writable
|
||||
*/
|
||||
IncrementCounterProcessor() {
|
||||
}
|
||||
|
||||
IncrementCounterProcessor(byte[] row) {
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<byte[]> getRowsToLock() {
|
||||
return Collections.singleton(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getResult() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(long now, HRegion region,
|
||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||
// Scan current counter
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
Scan scan = new Scan(row, row);
|
||||
scan.addColumn(FAM, COUNTER);
|
||||
doScan(region, scan, kvs);
|
||||
counter = kvs.size() == 0 ? 0 :
|
||||
Bytes.toInt(kvs.iterator().next().getValue());
|
||||
|
||||
// Assert counter value
|
||||
assertEquals(expectedCounter, counter);
|
||||
|
||||
// Increment counter and send it to both memstore and wal edit
|
||||
counter += 1;
|
||||
expectedCounter += 1;
|
||||
|
||||
|
||||
KeyValue kv =
|
||||
new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
|
||||
mutations.add(kv);
|
||||
walEdit.add(kv);
|
||||
|
||||
// We can also inject some meta data to the walEdit
|
||||
KeyValue metaKv = new KeyValue(
|
||||
row, HLog.METAFAMILY,
|
||||
Bytes.toBytes("I just increment counter"),
|
||||
Bytes.toBytes(counter));
|
||||
walEdit.add(metaKv);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.row = Bytes.readByteArray(in);
|
||||
this.counter = in.readInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, row);
|
||||
out.writeInt(counter);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class FriendsOfFriendsProcessor extends
|
||||
BaseRowProcessor<Set<String>> implements Writable {
|
||||
byte[] row = null;
|
||||
byte[] person = null;
|
||||
final Set<String> result = new HashSet<String>();
|
||||
|
||||
/**
|
||||
* Empty constructor for Writable
|
||||
*/
|
||||
FriendsOfFriendsProcessor() {
|
||||
}
|
||||
|
||||
FriendsOfFriendsProcessor(byte[] row, byte[] person) {
|
||||
this.row = row;
|
||||
this.person = person;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<byte[]> getRowsToLock() {
|
||||
return Collections.singleton(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(long now, HRegion region,
|
||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
{ // First scan to get friends of the person
|
||||
Scan scan = new Scan(row, row);
|
||||
scan.addColumn(FAM, person);
|
||||
doScan(region, scan, kvs);
|
||||
}
|
||||
|
||||
// Second scan to get friends of friends
|
||||
Scan scan = new Scan(row, row);
|
||||
for (KeyValue kv : kvs) {
|
||||
byte[] friends = kv.getValue();
|
||||
for (byte f : friends) {
|
||||
scan.addColumn(FAM, new byte[]{f});
|
||||
}
|
||||
}
|
||||
doScan(region, scan, kvs);
|
||||
|
||||
// Collect result
|
||||
result.clear();
|
||||
for (KeyValue kv : kvs) {
|
||||
for (byte b : kv.getValue()) {
|
||||
result.add((char)b + "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.person = Bytes.readByteArray(in);
|
||||
this.row = Bytes.readByteArray(in);
|
||||
int size = in.readInt();
|
||||
result.clear();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
result.add(Text.readString(in));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, person);
|
||||
Bytes.writeByteArray(out, row);
|
||||
out.writeInt(result.size());
|
||||
for (String s : result) {
|
||||
Text.writeString(out, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class RowSwapProcessor extends
|
||||
BaseRowProcessor<Set<String>> implements Writable {
|
||||
byte[] row1 = new byte[0];
|
||||
byte[] row2 = new byte[0];
|
||||
|
||||
/**
|
||||
* Empty constructor for Writable
|
||||
*/
|
||||
RowSwapProcessor() {
|
||||
}
|
||||
|
||||
RowSwapProcessor(byte[] row1, byte[] row2) {
|
||||
this.row1 = row1;
|
||||
this.row2 = row2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<byte[]> getRowsToLock() {
|
||||
List<byte[]> rows = new ArrayList<byte[]>();
|
||||
rows.add(row1);
|
||||
rows.add(row2);
|
||||
return rows;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(long now, HRegion region,
|
||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||
|
||||
// Override the time to avoid race-condition in the unit test caused by
|
||||
// inacurate timer on some machines
|
||||
now = myTimer.getAndIncrement();
|
||||
|
||||
// Scan both rows
|
||||
List<KeyValue> kvs1 = new ArrayList<KeyValue>();
|
||||
List<KeyValue> kvs2 = new ArrayList<KeyValue>();
|
||||
doScan(region, new Scan(row1, row1), kvs1);
|
||||
doScan(region, new Scan(row2, row2), kvs2);
|
||||
|
||||
// Assert swapped
|
||||
if (swapped) {
|
||||
assertEquals(rowSize, kvs2.size());
|
||||
assertEquals(row2Size, kvs1.size());
|
||||
} else {
|
||||
assertEquals(rowSize, kvs1.size());
|
||||
assertEquals(row2Size, kvs2.size());
|
||||
}
|
||||
swapped = !swapped;
|
||||
|
||||
// Add and delete keyvalues
|
||||
List<List<KeyValue>> kvs = new ArrayList<List<KeyValue>>();
|
||||
kvs.add(kvs1);
|
||||
kvs.add(kvs2);
|
||||
byte[][] rows = new byte[][]{row1, row2};
|
||||
for (int i = 0; i < kvs.size(); ++i) {
|
||||
for (KeyValue kv : kvs.get(i)) {
|
||||
// Delete from the current row and add to the other row
|
||||
KeyValue kvDelete =
|
||||
new KeyValue(rows[i], kv.getFamily(), kv.getQualifier(),
|
||||
kv.getTimestamp(), KeyValue.Type.Delete);
|
||||
KeyValue kvAdd =
|
||||
new KeyValue(rows[1 - i], kv.getFamily(), kv.getQualifier(),
|
||||
now, kv.getValue());
|
||||
mutations.add(kvDelete);
|
||||
walEdit.add(kvDelete);
|
||||
mutations.add(kvAdd);
|
||||
walEdit.add(kvAdd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.row1 = Bytes.readByteArray(in);
|
||||
this.row2 = Bytes.readByteArray(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, row1);
|
||||
Bytes.writeByteArray(out, row2);
|
||||
}
|
||||
}
|
||||
|
||||
public static class TimeoutProcessor extends
|
||||
BaseRowProcessor<Void> implements Writable {
|
||||
|
||||
byte[] row = new byte[0];
|
||||
|
||||
/**
|
||||
* Empty constructor for Writable
|
||||
*/
|
||||
public TimeoutProcessor() {
|
||||
}
|
||||
|
||||
public TimeoutProcessor(byte[] row) {
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
public Collection<byte[]> getRowsToLock() {
|
||||
return Collections.singleton(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(long now, HRegion region,
|
||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||
try {
|
||||
// Sleep for a long time so it timeout
|
||||
Thread.sleep(100 * 1000L);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.row = Bytes.readByteArray(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, row);
|
||||
}
|
||||
}
|
||||
|
||||
public static void doScan(
|
||||
HRegion region, Scan scan, List<KeyValue> result) throws IOException {
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||
scanner = region.getScanner(scan);
|
||||
result.clear();
|
||||
scanner.next(result);
|
||||
} finally {
|
||||
if (scanner != null) scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static String stringifyKvs(Collection<KeyValue> kvs) {
|
||||
StringBuilder out = new StringBuilder();
|
||||
out.append("[");
|
||||
if (kvs != null) {
|
||||
for (KeyValue kv : kvs) {
|
||||
byte[] col = kv.getQualifier();
|
||||
byte[] val = kv.getValue();
|
||||
if (Bytes.equals(col, COUNTER)) {
|
||||
out.append(Bytes.toStringBinary(col) + ":" +
|
||||
Bytes.toInt(val) + " ");
|
||||
} else {
|
||||
out.append(Bytes.toStringBinary(col) + ":" +
|
||||
Bytes.toStringBinary(val) + " ");
|
||||
}
|
||||
}
|
||||
}
|
||||
out.append("]");
|
||||
return out.toString();
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||
}
|
Loading…
Reference in New Issue