From ba3426321abc327246dd869243eb54de98ba05c4 Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Mon, 7 Feb 2011 23:01:28 +0000 Subject: [PATCH] HBASE-3257 Coprocessors: Extend server side API to include HLog operations git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1068206 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 13 +- .../BaseRegionObserverCoprocessor.java | 13 + .../hbase/coprocessor/CoprocessorHost.java | 3 + .../hbase/coprocessor/RegionObserver.java | 29 ++ .../WALCoprocessorEnvironment.java | 28 ++ .../hadoop/hbase/coprocessor/WALObserver.java | 67 ++++ .../hadoop/hbase/regionserver/HRegion.java | 14 + .../regionserver/RegionCoprocessorHost.java | 55 +++ .../hadoop/hbase/regionserver/wal/HLog.java | 17 +- .../regionserver/wal/WALCoprocessorHost.java | 142 +++++++ .../coprocessor/SampleRegionWALObserver.java | 162 ++++++++ .../coprocessor/SimpleRegionObserver.java | 10 + .../coprocessor/TestWALCoprocessors.java | 373 ++++++++++++++++++ .../hbase/regionserver/wal/TestHLog.java | 18 + 14 files changed, 938 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java diff --git a/CHANGES.txt b/CHANGES.txt index 087d742e89c..a89461e836f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,8 +49,6 @@ Release 0.91.0 - Unreleased IMPROVEMENTS - HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via - Andrew Purtell) HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) HBASE-3292 Expose block cache hit/miss/evict counts into region server metrics @@ -60,9 +58,6 @@ Release 0.91.0 - Unreleased HBASE-1861 Multi-Family support for bulk upload tools HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot HBASE-3328 Added Admin API to specify explicit split points - HBASE-3345 Coprocessors: Allow observers to completely override base - function - HBASE-3260 Coprocessors: Add explicit lifecycle management HBASE-3377 Upgrade Jetty to 6.1.26 HBASE-3387 Pair does not deep check arrays for equality (Jesse Yates via Stack) @@ -77,16 +72,24 @@ Release 0.91.0 - Unreleased NEW FEATURES + HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via + Andrew Purtell) HBASE-3287 Add option to cache blocks on hfile write and evict blocks on hfile close HBASE-3335 Add BitComparator for filtering (Nathaniel Cook via Stack) + HBASE-3260 Coprocessors: Add explicit lifecycle management HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster + HBASE-3345 Coprocessors: Allow observers to completely override base + function HBASE-3448 RegionSplitter, utility class to manually split tables HBASE-2824 A filter that randomly includes rows based on a configured chance (Ferdy via Andrew Purtell) HBASE-3455 Add memstore-local allocation buffers to combat heap fragmentation in the region server. Enabled by default as of 0.91 + HBASE-3257 Coprocessors: Extend server side API to include HLog operations + (Mingjie Lai via Andrew Purtell) + Release 0.90.1 - Unreleased diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java index a8ee5e76a78..ac1e230c548 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import java.io.IOException; @@ -222,4 +225,14 @@ public abstract class BaseRegionObserverCoprocessor implements RegionObserver { public void postScannerClose(final RegionCoprocessorEnvironment e, final InternalScanner s) throws IOException { } + + @Override + public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + } + + @Override + public void postWALRestore(RegionCoprocessorEnvironment env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + } } diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 68fe7b2cdcb..db96a0ba216 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -51,6 +51,9 @@ public abstract class CoprocessorHost { "hbase.coprocessor.region.classes"; public static final String MASTER_COPROCESSOR_CONF_KEY = "hbase.coprocessor.master.classes"; + public static final String WAL_COPROCESSOR_CONF_KEY = + "hbase.coprocessor.wal.classes"; + private static final Log LOG = LogFactory.getLog(CoprocessorHost.class); /** Ordered set of loaded coprocessors with lock */ protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock(); diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 42356c92c58..fe8775b97d3 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -29,6 +30,8 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import java.io.IOException; @@ -529,4 +532,30 @@ public interface RegionObserver extends Coprocessor { public void postScannerClose(final RegionCoprocessorEnvironment e, final InternalScanner s) throws IOException; + + /** + * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit} + * replayed for this region. + * + * @param env + * @param info + * @param logKey + * @param logEdit + * @throws IOException + */ + void preWALRestore(final RegionCoprocessorEnvironment env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; + + /** + * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit} + * replayed for this region. + * + * @param env + * @param info + * @param logKey + * @param logEdit + * @throws IOException + */ + void postWALRestore(final RegionCoprocessorEnvironment env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; } diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java new file mode 100644 index 00000000000..6580c2c8dc8 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java @@ -0,0 +1,28 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.regionserver.wal.HLog; + +public interface WALCoprocessorEnvironment extends CoprocessorEnvironment { + /** @return reference to the region server services */ + public HLog getWAL(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java new file mode 100644 index 00000000000..7a34d18a9fc --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java @@ -0,0 +1,67 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +import java.io.IOException; + +/** + * It's provided to have a way for coprocessors to observe, rewrite, + * or skip WALEdits as they are being written to the WAL. + * + * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} provides + * hooks for adding logic for WALEdits in the region context during reconstruction, + * + * Defines coprocessor hooks for interacting with operations on the + * {@link org.apache.hadoop.hbase.regionserver.wal.HLog}. + */ +public interface WALObserver extends Coprocessor { + + /** + * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit} + * is writen to WAL. + * + * @param env + * @param info + * @param logKey + * @param logEdit + * @return true if default behavior should be bypassed, false otherwise + * @throws IOException + */ + boolean preWALWrite(CoprocessorEnvironment env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; + + /** + * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit} + * is writen to WAL. + * + * @param env + * @param info + * @param logKey + * @param logEdit + * @throws IOException + */ + void postWALWrite(CoprocessorEnvironment env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d64e19b8e55..fb6edfa003c 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2009,6 +2009,16 @@ public class HRegion implements HeapSize { // , Writable{ while ((entry = reader.next()) != null) { HLogKey key = entry.getKey(); WALEdit val = entry.getEdit(); + + // Start coprocessor replay here. The coprocessor is for each WALEdit + // instead of a KeyValue. + if (coprocessorHost != null) { + if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) { + // if bypass this log entry, ignore it ... + continue; + } + } + if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } @@ -2046,6 +2056,10 @@ public class HRegion implements HeapSize { // , Writable{ } if (flush) internalFlushcache(null, currentEditSeqId); + if (coprocessorHost != null) { + coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); + } + // Every 'interval' edits, tell the reporter we're making progress. // Have seen 60k edits taking 3minutes to complete. if (reporter != null && (editsCount % interval) == 0) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 7c4e1a1a1f1..cb8a6f91a90 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.coprocessor.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.util.StringUtils; @@ -1004,4 +1007,56 @@ public class RegionCoprocessorHost coprocessorLock.readLock().unlock(); } } + + /** + * @param info + * @param logKey + * @param logEdit + * @return true if default behavior should be bypassed, false otherwise + * @throws IOException + */ + public boolean preWALRestore(HRegionInfo info, HLogKey logKey, + WALEdit logEdit) throws IOException { + try { + boolean bypass = false; + coprocessorLock.readLock().lock(); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preWALRestore(env, info, logKey, + logEdit); + } + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } + } + return bypass; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param info + * @param logKey + * @param logEdit + * @throws IOException + */ + public void postWALRestore(HRegionInfo info, HLogKey logKey, + WALEdit logEdit) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postWALRestore(env, info, + logKey, logEdit); + } + if (env.shouldComplete()) { + break; + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index f06d263643f..7569992535f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -135,6 +135,8 @@ public class HLog implements Syncable { private static Class logWriterClass; private static Class logReaderClass; + private WALCoprocessorHost coprocessorHost; + static void resetLogReaderClass() { HLog.logReaderClass = null; } @@ -400,6 +402,7 @@ public class HLog implements Syncable { logSyncerThread = new LogSyncer(this.optionalFlushInterval); Threads.setDaemonThreadRunning(logSyncerThread, Thread.currentThread().getName() + ".logSyncer"); + coprocessorHost = new WALCoprocessorHost(this, conf); } public void registerWALActionsListener (final WALObserver listener) { @@ -1074,8 +1077,13 @@ public class HLog implements Syncable { } try { long now = System.currentTimeMillis(); - this.writer.append(new HLog.Entry(logKey, logEdit)); + // coprocessor hook: + if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) { + // if not bypassed: + this.writer.append(new HLog.Entry(logKey, logEdit)); + } long took = System.currentTimeMillis() - now; + coprocessorHost.postWALWrite(info, logKey, logEdit); writeTime += took; writeOps++; if (took > 1000) { @@ -1444,6 +1452,13 @@ public class HLog implements Syncable { logSplitter.splitLog(); } + /** + * @return Coprocessor host. + */ + public WALCoprocessorHost getCoprocessorHost() { + return coprocessorHost; + } + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java new file mode 100644 index 00000000000..6885bc0afe2 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -0,0 +1,142 @@ + +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.coprocessor.*; +import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.conf.Configuration; + +/** + * Implements the coprocessor environment and runtime support for coprocessors + * loaded within a {@link HLog}. + */ +public class WALCoprocessorHost + extends CoprocessorHost { + + private static final Log LOG = LogFactory.getLog(WALCoprocessorHost.class); + + /** + * Encapsulation of the environment of each coprocessor + */ + static class WALEnvironment extends CoprocessorHost.Environment + implements WALCoprocessorEnvironment { + + private HLog wal; + + @Override + public HLog getWAL() { + return wal; + } + + /** + * Constructor + * @param impl the coprocessor instance + * @param priority chaining priority + */ + public WALEnvironment(Class implClass, final Coprocessor impl, + Coprocessor.Priority priority, final HLog hlog) { + super(impl, priority); + this.wal = hlog; + } + } + + HLog wal; + /** + * Constructor + * @param region the region + * @param rsServices interface to available region server functionality + * @param conf the configuration + */ + public WALCoprocessorHost(final HLog log, final Configuration conf) { + this.wal = log; + // load system default cp's from configuration. + loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY); + } + + @Override + public WALEnvironment createEnvironment(Class implClass, + Coprocessor instance, Priority priority) { + // TODO Auto-generated method stub + return new WALEnvironment(implClass, instance, priority, this.wal); + } + + /** + * @param info + * @param logKey + * @param logEdit + * @return true if default behavior should be bypassed, false otherwise + * @throws IOException + */ + public boolean preWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) + throws IOException { + try { + boolean bypass = false; + coprocessorLock.readLock().lock(); + for (WALEnvironment env: coprocessors) { + if (env.getInstance() instanceof + org.apache.hadoop.hbase.coprocessor.WALObserver) { + ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()). + preWALWrite(env, info, logKey, logEdit); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } + } + } + return bypass; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param info + * @param logKey + * @param logEdit + * @throws IOException + */ + public void postWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (WALEnvironment env: coprocessors) { + if (env.getInstance() instanceof + org.apache.hadoop.hbase.coprocessor.WALObserver) { + ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()). + postWALWrite(env, info, logKey, logEdit); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java new file mode 100644 index 00000000000..834283f12cc --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java @@ -0,0 +1,162 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/** + * Class for testing WAL coprocessor extension. WAL write monitor is defined + * in LogObserver while WAL Restore is in RegionObserver. + * + * It will monitor a WAL writing and Restore, modify passed-in WALEdit, i.e, + * ignore specified columns when writing, and add a KeyValue. On the other + * hand, it checks whether the ignored column is still in WAL when Restoreed + * at region reconstruct. + */ +public class SampleRegionWALObserver extends BaseRegionObserverCoprocessor +implements WALObserver { + + private static final Log LOG = LogFactory.getLog(SampleRegionWALObserver.class); + + private byte[] tableName; + private byte[] row; + private byte[] ignoredFamily; + private byte[] ignoredQualifier; + private byte[] addedFamily; + private byte[] addedQualifier; + private byte[] changedFamily; + private byte[] changedQualifier; + + private boolean preWALWriteCalled = false; + private boolean postWALWriteCalled = false; + private boolean preWALRestoreCalled = false; + private boolean postWALRestoreCalled = false; + + /** + * Set values: with a table name, a column name which will be ignored, and + * a column name which will be added to WAL. + */ + public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq, + byte[] chf, byte[] chq, byte[] addf, byte[] addq) { + this.row = row; + this.tableName = tableName; + this.ignoredFamily = igf; + this.ignoredQualifier = igq; + this.addedFamily = addf; + this.addedQualifier = addq; + this.changedFamily = chf; + this.changedQualifier = chq; + } + + + @Override + public void postWALWrite(CoprocessorEnvironment env, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + postWALWriteCalled = true; + } + + @Override + public boolean preWALWrite(CoprocessorEnvironment env, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + boolean bypass = false; + // check table name matches or not. + if (!Arrays.equals(HRegionInfo.getTableName(info.getRegionName()), this.tableName)) { + return bypass; + } + preWALWriteCalled = true; + // here we're going to remove one keyvalue from the WALEdit, and add + // another one to it. + List kvs = logEdit.getKeyValues(); + KeyValue deletedKV = null; + for (KeyValue kv : kvs) { + // assume only one kv from the WALEdit matches. + byte[] family = kv.getFamily(); + byte[] qulifier = kv.getQualifier(); + + if (Arrays.equals(family, ignoredFamily) && + Arrays.equals(qulifier, ignoredQualifier)) { + LOG.debug("Found the KeyValue from WALEdit which should be ignored."); + deletedKV = kv; + } + if (Arrays.equals(family, changedFamily) && + Arrays.equals(qulifier, changedQualifier)) { + LOG.debug("Found the KeyValue from WALEdit which should be changed."); + kv.getBuffer()[kv.getValueOffset()] += 1; + } + } + kvs.add(new KeyValue(row, addedFamily, addedQualifier)); + if (deletedKV != null) { + LOG.debug("About to delete a KeyValue from WALEdit."); + kvs.remove(deletedKV); + } + return bypass; + } + + /** + * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is + * Restoreed. + */ + @Override + public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + preWALRestoreCalled = true; + } + + /** + * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is + * Restoreed. + */ + @Override + public void postWALRestore(RegionCoprocessorEnvironment env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + postWALRestoreCalled = true; + } + + public boolean isPreWALWriteCalled() { + return preWALWriteCalled; + } + + public boolean isPostWALWriteCalled() { + return postWALWriteCalled; + } + + public boolean isPreWALRestoreCalled() { + LOG.debug(SampleRegionWALObserver.class.getName() + + ".isPreWALRestoreCalled is called."); + return preWALRestoreCalled; + } + + public boolean isPostWALRestoreCalled() { + LOG.debug(SampleRegionWALObserver.class.getName() + + ".isPostWALRestoreCalled is called."); + return postWALRestoreCalled; + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 4d86294882a..b6e595fd39c 100644 --- a/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -67,6 +67,8 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor { boolean hadPostGetClosestRowBefore = false; boolean hadPreIncrement = false; boolean hadPostIncrement = false; + boolean hadPreWALRestored = false; + boolean hadPostWALRestored = false; @Override public void preOpen(RegionCoprocessorEnvironment e) { @@ -333,4 +335,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor { boolean hadPostIncrement() { return hadPostIncrement; } + + boolean hadPreWALRestored() { + return hadPreWALRestored; + } + + boolean hadPostWALRestored() { + return hadPostWALRestored; + } } diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java new file mode 100644 index 00000000000..27c38f9e7c6 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java @@ -0,0 +1,373 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +/** + * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} + * interface hooks at all appropriate times during normal HMaster operations. + */ +public class TestWALCoprocessors { + private static final Log LOG = LogFactory.getLog(TestWALCoprocessors.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static byte[] TEST_TABLE = Bytes.toBytes("observedTable"); + private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"), + Bytes.toBytes("fam2"), + Bytes.toBytes("fam3"), + }; + private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"), + Bytes.toBytes("q2"), + Bytes.toBytes("q3"), + }; + private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"), + Bytes.toBytes("v2"), + Bytes.toBytes("v3"), + }; + private static byte[] TEST_ROW = Bytes.toBytes("testRow"); + + private Configuration conf; + private FileSystem fs; + private Path dir; + private MiniDFSCluster cluster; + private Path hbaseRootDir; + private Path oldLogDir; + private Path logDir; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + SampleRegionWALObserver.class.getName()); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + SampleRegionWALObserver.class.getName()); + conf.setBoolean("dfs.support.append", true); + conf.setInt("dfs.client.block.recovery.retries", 2); + conf.setInt("hbase.regionserver.flushlogentries", 1); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000); + Path hbaseRootDir = + TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); + LOG.info("hbase.rootdir=" + hbaseRootDir); + conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString()); + } + + @AfterClass + public static void teardownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + //this.cluster = TEST_UTIL.getDFSCluster(); + this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); + this.hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR)); + this.dir = new Path(this.hbaseRootDir, TestWALCoprocessors.class.getName()); + this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME); + + if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); + } + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); + } + + /** + * Test WAL write behavior with WALObserver. The coprocessor monitors + * a WALEdit written to WAL, and ignore, modify, and add KeyValue's for the + * WALEdit. + */ + @Test + public void testWWALCoprocessorWriteToWAL() throws Exception { + HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE)); + Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); + deleteDir(basedir); + fs.mkdirs(new Path(basedir, hri.getEncodedName())); + + HLog log = new HLog(this.fs, this.dir, this.oldLogDir, this.conf); + SampleRegionWALObserver cp = getCoprocessor(log); + + // TEST_FAMILY[0] shall be removed from WALEdit. + // TEST_FAMILY[1] value shall be changed. + // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put. + cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0], + TEST_FAMILY[1], TEST_QUALIFIER[1], + TEST_FAMILY[2], TEST_QUALIFIER[2]); + + assertFalse(cp.isPreWALWriteCalled()); + assertFalse(cp.isPostWALWriteCalled()); + + // TEST_FAMILY[2] is not in the put, however it shall be added by the tested + // coprocessor. + // Use a Put to create familyMap. + Put p = creatPutWith2Families(TEST_ROW); + + Map> familyMap = p.getFamilyMap(); + WALEdit edit = new WALEdit(); + addFamilyMapToWALEdit(familyMap, edit); + + boolean foundFamily0 = false; + boolean foundFamily2 = false; + boolean modifiedFamily1 = false; + + List kvs = edit.getKeyValues(); + + for (KeyValue kv : kvs) { + if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) { + foundFamily0 = true; + } + if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) { + foundFamily2 = true; + } + if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) { + if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) { + modifiedFamily1 = true; + } + } + } + assertTrue(foundFamily0); + assertFalse(foundFamily2); + assertFalse(modifiedFamily1); + + // it's where WAL write cp should occur. + long now = EnvironmentEdgeManager.currentTimeMillis(); + log.append(hri, hri.getTableDesc().getName(), edit, now); + + // the edit shall have been change now by the coprocessor. + foundFamily0 = false; + foundFamily2 = false; + modifiedFamily1 = false; + for (KeyValue kv : kvs) { + if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) { + foundFamily0 = true; + } + if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) { + foundFamily2 = true; + } + if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) { + if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) { + modifiedFamily1 = true; + } + } + } + assertFalse(foundFamily0); + assertTrue(foundFamily2); + assertTrue(modifiedFamily1); + + assertTrue(cp.isPreWALWriteCalled()); + assertTrue(cp.isPostWALWriteCalled()); + } + + /** + * Test WAL replay behavior with WALObserver. + */ + @Test + public void testWALCoprocessorReplay() throws Exception { + // WAL replay is handled at HRegion::replayRecoveredEdits(), which is + // ultimately called by HRegion::initialize() + byte[] tableName = Bytes.toBytes("testWALCoprocessorReplay"); + + final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); + final Path basedir = new Path(this.hbaseRootDir, Bytes.toString(tableName)); + deleteDir(basedir); + fs.mkdirs(new Path(basedir, hri.getEncodedName())); + + //HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf); + HLog wal = createWAL(this.conf); + //Put p = creatPutWith2Families(TEST_ROW); + WALEdit edit = new WALEdit(); + long now = EnvironmentEdgeManager.currentTimeMillis(); + //addFamilyMapToWALEdit(p.getFamilyMap(), edit); + final int countPerFamily = 1000; + for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { + addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, + EnvironmentEdgeManager.getDelegate(), wal); + } + wal.append(hri, tableName, edit, now); + // sync to fs. + wal.sync(); + + final Configuration newConf = HBaseConfiguration.create(this.conf); + User user = HBaseTestingUtility.getDifferentUser(newConf, + ".replay.wal.secondtime"); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + runWALSplit(newConf); + FileSystem newFS = FileSystem.get(newConf); + // Make a new wal for new region open. + HLog wal2 = createWAL(newConf); + HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf), + newConf, hri, TEST_UTIL.getHBaseCluster().getRegionServer(0)); + long seqid2 = region2.initialize(); + + SampleRegionWALObserver cp2 = + (SampleRegionWALObserver)region2.getCoprocessorHost().findCoprocessor( + SampleRegionWALObserver.class.getName()); + // TODO: asserting here is problematic. + assertNotNull(cp2); + assertTrue(cp2.isPreWALRestoreCalled()); + assertTrue(cp2.isPostWALRestoreCalled()); + region2.close(); + wal2.closeAndDelete(); + return null; + } + }); + } + /** + * Test to see CP loaded successfully or not. There is a duplication + * at TestHLog, but the purpose of that one is to see whether the loaded + * CP will impact existing HLog tests or not. + */ + @Test + public void testWALCoprocessorLoaded() throws Exception { + HLog log = new HLog(fs, dir, oldLogDir, conf); + assertNotNull(getCoprocessor(log)); + } + + private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception { + WALCoprocessorHost host = wal.getCoprocessorHost(); + Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName()); + return (SampleRegionWALObserver)c; + } + + /* + * Creates an HRI around an HTD that has tableName and three + * column families named. + * @param tableName Name of table to use when we create HTableDescriptor. + */ + private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) { + HTableDescriptor htd = new HTableDescriptor(tableName); + + for (int i = 0; i < TEST_FAMILY.length; i++ ) { + HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]); + htd.addFamily(a); + } + return new HRegionInfo(htd, null, null, false); + } + + /* + * @param p Directory to cleanup + */ + private void deleteDir(final Path p) throws IOException { + if (this.fs.exists(p)) { + if (!this.fs.delete(p, true)) { + throw new IOException("Failed remove of " + p); + } + } + } + + private Put creatPutWith2Families(byte[] row) throws IOException { + Put p = new Put(row); + for (int i = 0; i < TEST_FAMILY.length-1; i++ ) { + p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], + TEST_VALUE[i]); + } + return p; + } + + /** + * Copied from HRegion. + * + * @param familyMap map of family->edits + * @param walEdit the destination entry to append into + */ + private void addFamilyMapToWALEdit(Map> familyMap, + WALEdit walEdit) { + for (List edits : familyMap.values()) { + for (KeyValue kv : edits) { + walEdit.add(kv); + } + } + } + private Path runWALSplit(final Configuration c) throws IOException { + FileSystem fs = FileSystem.get(c); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c, + this.hbaseRootDir, this.logDir, this.oldLogDir, fs); + List splits = logSplitter.splitLog(); + // Split should generate only 1 file since there's only 1 region + assertEquals(1, splits.size()); + // Make sure the file exists + assertTrue(fs.exists(splits.get(0))); + LOG.info("Split file=" + splits.get(0)); + return splits.get(0); + } + private HLog createWAL(final Configuration c) throws IOException { + HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c); + return wal; + } + private void addWALEdits (final byte [] tableName, final HRegionInfo hri, + final byte [] rowName, final byte [] family, + final int count, EnvironmentEdge ee, final HLog wal) + throws IOException { + String familyStr = Bytes.toString(family); + for (int j = 0; j < count; j++) { + byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); + byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, qualifierBytes, + ee.currentTimeMillis(), columnBytes)); + wal.append(hri, tableName, edit, ee.currentTimeMillis()); + } + } +} + diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 08ba8cbf054..64519375491 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import java.io.IOException; import java.lang.reflect.Method; @@ -45,6 +46,9 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.coprocessor.Coprocessor; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; @@ -107,6 +111,8 @@ public class TestHLog { .setInt("ipc.client.connect.max.retries", 1); TEST_UTIL.getConfiguration().setInt( "dfs.client.block.recovery.retries", 1); + TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + SampleRegionWALObserver.class.getName()); TEST_UTIL.startMiniCluster(3); conf = TEST_UTIL.getConfiguration(); @@ -640,6 +646,18 @@ public class TestHLog { assertEquals(0, log.getNumLogFiles()); } + /** + * A loaded WAL coprocessor won't break existing HLog test cases. + */ + @Test + public void testWALCoprocessorLoaded() throws Exception { + // test to see whether the coprocessor is loaded or not. + HLog log = new HLog(fs, dir, oldLogDir, conf); + WALCoprocessorHost host = log.getCoprocessorHost(); + Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName()); + assertNotNull(c); + } + private void addEdits(HLog log, HRegionInfo hri, byte [] tableName, int times) throws IOException { final byte [] row = Bytes.toBytes("row");