HBASE-3257 Coprocessors: Extend server side API to include HLog operations
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1068206 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c479f0a984
commit
ba3426321a
13
CHANGES.txt
13
CHANGES.txt
|
@ -49,8 +49,6 @@ Release 0.91.0 - Unreleased
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
|
|
||||||
Andrew Purtell)
|
|
||||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||||
HBASE-3292 Expose block cache hit/miss/evict counts into region server
|
HBASE-3292 Expose block cache hit/miss/evict counts into region server
|
||||||
metrics
|
metrics
|
||||||
|
@ -60,9 +58,6 @@ Release 0.91.0 - Unreleased
|
||||||
HBASE-1861 Multi-Family support for bulk upload tools
|
HBASE-1861 Multi-Family support for bulk upload tools
|
||||||
HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
|
HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
|
||||||
HBASE-3328 Added Admin API to specify explicit split points
|
HBASE-3328 Added Admin API to specify explicit split points
|
||||||
HBASE-3345 Coprocessors: Allow observers to completely override base
|
|
||||||
function
|
|
||||||
HBASE-3260 Coprocessors: Add explicit lifecycle management
|
|
||||||
HBASE-3377 Upgrade Jetty to 6.1.26
|
HBASE-3377 Upgrade Jetty to 6.1.26
|
||||||
HBASE-3387 Pair does not deep check arrays for equality
|
HBASE-3387 Pair does not deep check arrays for equality
|
||||||
(Jesse Yates via Stack)
|
(Jesse Yates via Stack)
|
||||||
|
@ -77,16 +72,24 @@ Release 0.91.0 - Unreleased
|
||||||
|
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
|
||||||
|
Andrew Purtell)
|
||||||
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
|
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
|
||||||
hfile close
|
hfile close
|
||||||
HBASE-3335 Add BitComparator for filtering (Nathaniel Cook via Stack)
|
HBASE-3335 Add BitComparator for filtering (Nathaniel Cook via Stack)
|
||||||
|
HBASE-3260 Coprocessors: Add explicit lifecycle management
|
||||||
HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster
|
HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster
|
||||||
|
HBASE-3345 Coprocessors: Allow observers to completely override base
|
||||||
|
function
|
||||||
HBASE-3448 RegionSplitter, utility class to manually split tables
|
HBASE-3448 RegionSplitter, utility class to manually split tables
|
||||||
HBASE-2824 A filter that randomly includes rows based on a configured
|
HBASE-2824 A filter that randomly includes rows based on a configured
|
||||||
chance (Ferdy via Andrew Purtell)
|
chance (Ferdy via Andrew Purtell)
|
||||||
HBASE-3455 Add memstore-local allocation buffers to combat heap
|
HBASE-3455 Add memstore-local allocation buffers to combat heap
|
||||||
fragmentation in the region server. Enabled by default as of
|
fragmentation in the region server. Enabled by default as of
|
||||||
0.91
|
0.91
|
||||||
|
HBASE-3257 Coprocessors: Extend server side API to include HLog operations
|
||||||
|
(Mingjie Lai via Andrew Purtell)
|
||||||
|
|
||||||
|
|
||||||
Release 0.90.1 - Unreleased
|
Release 0.90.1 - Unreleased
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -222,4 +225,14 @@ public abstract class BaseRegionObserverCoprocessor implements RegionObserver {
|
||||||
public void postScannerClose(final RegionCoprocessorEnvironment e,
|
public void postScannerClose(final RegionCoprocessorEnvironment e,
|
||||||
final InternalScanner s) throws IOException {
|
final InternalScanner s) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info,
|
||||||
|
HLogKey logKey, WALEdit logEdit) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postWALRestore(RegionCoprocessorEnvironment env,
|
||||||
|
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,9 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
||||||
"hbase.coprocessor.region.classes";
|
"hbase.coprocessor.region.classes";
|
||||||
public static final String MASTER_COPROCESSOR_CONF_KEY =
|
public static final String MASTER_COPROCESSOR_CONF_KEY =
|
||||||
"hbase.coprocessor.master.classes";
|
"hbase.coprocessor.master.classes";
|
||||||
|
public static final String WAL_COPROCESSOR_CONF_KEY =
|
||||||
|
"hbase.coprocessor.wal.classes";
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
|
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
|
||||||
/** Ordered set of loaded coprocessors with lock */
|
/** Ordered set of loaded coprocessors with lock */
|
||||||
protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
|
protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -29,6 +30,8 @@ import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -529,4 +532,30 @@ public interface RegionObserver extends Coprocessor {
|
||||||
public void postScannerClose(final RegionCoprocessorEnvironment e,
|
public void postScannerClose(final RegionCoprocessorEnvironment e,
|
||||||
final InternalScanner s)
|
final InternalScanner s)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
|
||||||
|
* replayed for this region.
|
||||||
|
*
|
||||||
|
* @param env
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void preWALRestore(final RegionCoprocessorEnvironment env,
|
||||||
|
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
|
||||||
|
* replayed for this region.
|
||||||
|
*
|
||||||
|
* @param env
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void postWALRestore(final RegionCoprocessorEnvironment env,
|
||||||
|
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
|
||||||
|
public interface WALCoprocessorEnvironment extends CoprocessorEnvironment {
|
||||||
|
/** @return reference to the region server services */
|
||||||
|
public HLog getWAL();
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It's provided to have a way for coprocessors to observe, rewrite,
|
||||||
|
* or skip WALEdits as they are being written to the WAL.
|
||||||
|
*
|
||||||
|
* {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} provides
|
||||||
|
* hooks for adding logic for WALEdits in the region context during reconstruction,
|
||||||
|
*
|
||||||
|
* Defines coprocessor hooks for interacting with operations on the
|
||||||
|
* {@link org.apache.hadoop.hbase.regionserver.wal.HLog}.
|
||||||
|
*/
|
||||||
|
public interface WALObserver extends Coprocessor {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
|
||||||
|
* is writen to WAL.
|
||||||
|
*
|
||||||
|
* @param env
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @return true if default behavior should be bypassed, false otherwise
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
boolean preWALWrite(CoprocessorEnvironment env,
|
||||||
|
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
|
||||||
|
* is writen to WAL.
|
||||||
|
*
|
||||||
|
* @param env
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void postWALWrite(CoprocessorEnvironment env,
|
||||||
|
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
|
||||||
|
}
|
|
@ -2009,6 +2009,16 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
while ((entry = reader.next()) != null) {
|
while ((entry = reader.next()) != null) {
|
||||||
HLogKey key = entry.getKey();
|
HLogKey key = entry.getKey();
|
||||||
WALEdit val = entry.getEdit();
|
WALEdit val = entry.getEdit();
|
||||||
|
|
||||||
|
// Start coprocessor replay here. The coprocessor is for each WALEdit
|
||||||
|
// instead of a KeyValue.
|
||||||
|
if (coprocessorHost != null) {
|
||||||
|
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
|
||||||
|
// if bypass this log entry, ignore it ...
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (firstSeqIdInLog == -1) {
|
if (firstSeqIdInLog == -1) {
|
||||||
firstSeqIdInLog = key.getLogSeqNum();
|
firstSeqIdInLog = key.getLogSeqNum();
|
||||||
}
|
}
|
||||||
|
@ -2046,6 +2056,10 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
if (flush) internalFlushcache(null, currentEditSeqId);
|
if (flush) internalFlushcache(null, currentEditSeqId);
|
||||||
|
|
||||||
|
if (coprocessorHost != null) {
|
||||||
|
coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
|
||||||
|
}
|
||||||
|
|
||||||
// Every 'interval' edits, tell the reporter we're making progress.
|
// Every 'interval' edits, tell the reporter we're making progress.
|
||||||
// Have seen 60k edits taking 3minutes to complete.
|
// Have seen 60k edits taking 3minutes to complete.
|
||||||
if (reporter != null && (editsCount % interval) == 0) {
|
if (reporter != null && (editsCount % interval) == 0) {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.*;
|
import org.apache.hadoop.hbase.client.*;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
|
@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||||
import org.apache.hadoop.hbase.coprocessor.*;
|
import org.apache.hadoop.hbase.coprocessor.*;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -1004,4 +1007,56 @@ public class RegionCoprocessorHost
|
||||||
coprocessorLock.readLock().unlock();
|
coprocessorLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @return true if default behavior should be bypassed, false otherwise
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean preWALRestore(HRegionInfo info, HLogKey logKey,
|
||||||
|
WALEdit logEdit) throws IOException {
|
||||||
|
try {
|
||||||
|
boolean bypass = false;
|
||||||
|
coprocessorLock.readLock().lock();
|
||||||
|
for (RegionEnvironment env: coprocessors) {
|
||||||
|
if (env.getInstance() instanceof RegionObserver) {
|
||||||
|
((RegionObserver)env.getInstance()).preWALRestore(env, info, logKey,
|
||||||
|
logEdit);
|
||||||
|
}
|
||||||
|
bypass |= env.shouldBypass();
|
||||||
|
if (env.shouldComplete()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bypass;
|
||||||
|
} finally {
|
||||||
|
coprocessorLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void postWALRestore(HRegionInfo info, HLogKey logKey,
|
||||||
|
WALEdit logEdit) throws IOException {
|
||||||
|
try {
|
||||||
|
coprocessorLock.readLock().lock();
|
||||||
|
for (RegionEnvironment env: coprocessors) {
|
||||||
|
if (env.getInstance() instanceof RegionObserver) {
|
||||||
|
((RegionObserver)env.getInstance()).postWALRestore(env, info,
|
||||||
|
logKey, logEdit);
|
||||||
|
}
|
||||||
|
if (env.shouldComplete()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
coprocessorLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,8 @@ public class HLog implements Syncable {
|
||||||
private static Class<? extends Writer> logWriterClass;
|
private static Class<? extends Writer> logWriterClass;
|
||||||
private static Class<? extends Reader> logReaderClass;
|
private static Class<? extends Reader> logReaderClass;
|
||||||
|
|
||||||
|
private WALCoprocessorHost coprocessorHost;
|
||||||
|
|
||||||
static void resetLogReaderClass() {
|
static void resetLogReaderClass() {
|
||||||
HLog.logReaderClass = null;
|
HLog.logReaderClass = null;
|
||||||
}
|
}
|
||||||
|
@ -400,6 +402,7 @@ public class HLog implements Syncable {
|
||||||
logSyncerThread = new LogSyncer(this.optionalFlushInterval);
|
logSyncerThread = new LogSyncer(this.optionalFlushInterval);
|
||||||
Threads.setDaemonThreadRunning(logSyncerThread,
|
Threads.setDaemonThreadRunning(logSyncerThread,
|
||||||
Thread.currentThread().getName() + ".logSyncer");
|
Thread.currentThread().getName() + ".logSyncer");
|
||||||
|
coprocessorHost = new WALCoprocessorHost(this, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerWALActionsListener (final WALObserver listener) {
|
public void registerWALActionsListener (final WALObserver listener) {
|
||||||
|
@ -1074,8 +1077,13 @@ public class HLog implements Syncable {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
|
// coprocessor hook:
|
||||||
|
if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
|
||||||
|
// if not bypassed:
|
||||||
this.writer.append(new HLog.Entry(logKey, logEdit));
|
this.writer.append(new HLog.Entry(logKey, logEdit));
|
||||||
|
}
|
||||||
long took = System.currentTimeMillis() - now;
|
long took = System.currentTimeMillis() - now;
|
||||||
|
coprocessorHost.postWALWrite(info, logKey, logEdit);
|
||||||
writeTime += took;
|
writeTime += took;
|
||||||
writeOps++;
|
writeOps++;
|
||||||
if (took > 1000) {
|
if (took > 1000) {
|
||||||
|
@ -1444,6 +1452,13 @@ public class HLog implements Syncable {
|
||||||
logSplitter.splitLog();
|
logSplitter.splitLog();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Coprocessor host.
|
||||||
|
*/
|
||||||
|
public WALCoprocessorHost getCoprocessorHost() {
|
||||||
|
return coprocessorHost;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pass one or more log file names and it will either dump out a text version
|
* Pass one or more log file names and it will either dump out a text version
|
||||||
* on <code>stdout</code> or split the specified log files.
|
* on <code>stdout</code> or split the specified log files.
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.*;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements the coprocessor environment and runtime support for coprocessors
|
||||||
|
* loaded within a {@link HLog}.
|
||||||
|
*/
|
||||||
|
public class WALCoprocessorHost
|
||||||
|
extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(WALCoprocessorHost.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulation of the environment of each coprocessor
|
||||||
|
*/
|
||||||
|
static class WALEnvironment extends CoprocessorHost.Environment
|
||||||
|
implements WALCoprocessorEnvironment {
|
||||||
|
|
||||||
|
private HLog wal;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HLog getWAL() {
|
||||||
|
return wal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param impl the coprocessor instance
|
||||||
|
* @param priority chaining priority
|
||||||
|
*/
|
||||||
|
public WALEnvironment(Class<?> implClass, final Coprocessor impl,
|
||||||
|
Coprocessor.Priority priority, final HLog hlog) {
|
||||||
|
super(impl, priority);
|
||||||
|
this.wal = hlog;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HLog wal;
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param region the region
|
||||||
|
* @param rsServices interface to available region server functionality
|
||||||
|
* @param conf the configuration
|
||||||
|
*/
|
||||||
|
public WALCoprocessorHost(final HLog log, final Configuration conf) {
|
||||||
|
this.wal = log;
|
||||||
|
// load system default cp's from configuration.
|
||||||
|
loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WALEnvironment createEnvironment(Class<?> implClass,
|
||||||
|
Coprocessor instance, Priority priority) {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return new WALEnvironment(implClass, instance, priority, this.wal);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @return true if default behavior should be bypassed, false otherwise
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean preWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
boolean bypass = false;
|
||||||
|
coprocessorLock.readLock().lock();
|
||||||
|
for (WALEnvironment env: coprocessors) {
|
||||||
|
if (env.getInstance() instanceof
|
||||||
|
org.apache.hadoop.hbase.coprocessor.WALObserver) {
|
||||||
|
((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
|
||||||
|
preWALWrite(env, info, logKey, logEdit);
|
||||||
|
bypass |= env.shouldBypass();
|
||||||
|
if (env.shouldComplete()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bypass;
|
||||||
|
} finally {
|
||||||
|
coprocessorLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param info
|
||||||
|
* @param logKey
|
||||||
|
* @param logEdit
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void postWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
coprocessorLock.readLock().lock();
|
||||||
|
for (WALEnvironment env: coprocessors) {
|
||||||
|
if (env.getInstance() instanceof
|
||||||
|
org.apache.hadoop.hbase.coprocessor.WALObserver) {
|
||||||
|
((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
|
||||||
|
postWALWrite(env, info, logKey, logEdit);
|
||||||
|
if (env.shouldComplete()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
coprocessorLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,162 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class for testing WAL coprocessor extension. WAL write monitor is defined
|
||||||
|
* in LogObserver while WAL Restore is in RegionObserver.
|
||||||
|
*
|
||||||
|
* It will monitor a WAL writing and Restore, modify passed-in WALEdit, i.e,
|
||||||
|
* ignore specified columns when writing, and add a KeyValue. On the other
|
||||||
|
* hand, it checks whether the ignored column is still in WAL when Restoreed
|
||||||
|
* at region reconstruct.
|
||||||
|
*/
|
||||||
|
public class SampleRegionWALObserver extends BaseRegionObserverCoprocessor
|
||||||
|
implements WALObserver {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(SampleRegionWALObserver.class);
|
||||||
|
|
||||||
|
private byte[] tableName;
|
||||||
|
private byte[] row;
|
||||||
|
private byte[] ignoredFamily;
|
||||||
|
private byte[] ignoredQualifier;
|
||||||
|
private byte[] addedFamily;
|
||||||
|
private byte[] addedQualifier;
|
||||||
|
private byte[] changedFamily;
|
||||||
|
private byte[] changedQualifier;
|
||||||
|
|
||||||
|
private boolean preWALWriteCalled = false;
|
||||||
|
private boolean postWALWriteCalled = false;
|
||||||
|
private boolean preWALRestoreCalled = false;
|
||||||
|
private boolean postWALRestoreCalled = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set values: with a table name, a column name which will be ignored, and
|
||||||
|
* a column name which will be added to WAL.
|
||||||
|
*/
|
||||||
|
public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq,
|
||||||
|
byte[] chf, byte[] chq, byte[] addf, byte[] addq) {
|
||||||
|
this.row = row;
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.ignoredFamily = igf;
|
||||||
|
this.ignoredQualifier = igq;
|
||||||
|
this.addedFamily = addf;
|
||||||
|
this.addedQualifier = addq;
|
||||||
|
this.changedFamily = chf;
|
||||||
|
this.changedQualifier = chq;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postWALWrite(CoprocessorEnvironment env, HRegionInfo info,
|
||||||
|
HLogKey logKey, WALEdit logEdit) throws IOException {
|
||||||
|
postWALWriteCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean preWALWrite(CoprocessorEnvironment env, HRegionInfo info,
|
||||||
|
HLogKey logKey, WALEdit logEdit) throws IOException {
|
||||||
|
boolean bypass = false;
|
||||||
|
// check table name matches or not.
|
||||||
|
if (!Arrays.equals(HRegionInfo.getTableName(info.getRegionName()), this.tableName)) {
|
||||||
|
return bypass;
|
||||||
|
}
|
||||||
|
preWALWriteCalled = true;
|
||||||
|
// here we're going to remove one keyvalue from the WALEdit, and add
|
||||||
|
// another one to it.
|
||||||
|
List<KeyValue> kvs = logEdit.getKeyValues();
|
||||||
|
KeyValue deletedKV = null;
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
// assume only one kv from the WALEdit matches.
|
||||||
|
byte[] family = kv.getFamily();
|
||||||
|
byte[] qulifier = kv.getQualifier();
|
||||||
|
|
||||||
|
if (Arrays.equals(family, ignoredFamily) &&
|
||||||
|
Arrays.equals(qulifier, ignoredQualifier)) {
|
||||||
|
LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
|
||||||
|
deletedKV = kv;
|
||||||
|
}
|
||||||
|
if (Arrays.equals(family, changedFamily) &&
|
||||||
|
Arrays.equals(qulifier, changedQualifier)) {
|
||||||
|
LOG.debug("Found the KeyValue from WALEdit which should be changed.");
|
||||||
|
kv.getBuffer()[kv.getValueOffset()] += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
kvs.add(new KeyValue(row, addedFamily, addedQualifier));
|
||||||
|
if (deletedKV != null) {
|
||||||
|
LOG.debug("About to delete a KeyValue from WALEdit.");
|
||||||
|
kvs.remove(deletedKV);
|
||||||
|
}
|
||||||
|
return bypass;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
|
||||||
|
* Restoreed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info,
|
||||||
|
HLogKey logKey, WALEdit logEdit) throws IOException {
|
||||||
|
preWALRestoreCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
|
||||||
|
* Restoreed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void postWALRestore(RegionCoprocessorEnvironment env,
|
||||||
|
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
|
||||||
|
postWALRestoreCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPreWALWriteCalled() {
|
||||||
|
return preWALWriteCalled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPostWALWriteCalled() {
|
||||||
|
return postWALWriteCalled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPreWALRestoreCalled() {
|
||||||
|
LOG.debug(SampleRegionWALObserver.class.getName() +
|
||||||
|
".isPreWALRestoreCalled is called.");
|
||||||
|
return preWALRestoreCalled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPostWALRestoreCalled() {
|
||||||
|
LOG.debug(SampleRegionWALObserver.class.getName() +
|
||||||
|
".isPostWALRestoreCalled is called.");
|
||||||
|
return postWALRestoreCalled;
|
||||||
|
}
|
||||||
|
}
|
|
@ -67,6 +67,8 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
||||||
boolean hadPostGetClosestRowBefore = false;
|
boolean hadPostGetClosestRowBefore = false;
|
||||||
boolean hadPreIncrement = false;
|
boolean hadPreIncrement = false;
|
||||||
boolean hadPostIncrement = false;
|
boolean hadPostIncrement = false;
|
||||||
|
boolean hadPreWALRestored = false;
|
||||||
|
boolean hadPostWALRestored = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preOpen(RegionCoprocessorEnvironment e) {
|
public void preOpen(RegionCoprocessorEnvironment e) {
|
||||||
|
@ -333,4 +335,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
||||||
boolean hadPostIncrement() {
|
boolean hadPostIncrement() {
|
||||||
return hadPostIncrement;
|
return hadPostIncrement;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean hadPreWALRestored() {
|
||||||
|
return hadPreWALRestored;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean hadPostWALRestored() {
|
||||||
|
return hadPostWALRestored;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,373 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver}
|
||||||
|
* interface hooks at all appropriate times during normal HMaster operations.
|
||||||
|
*/
|
||||||
|
public class TestWALCoprocessors {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestWALCoprocessors.class);
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
|
||||||
|
private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
|
||||||
|
Bytes.toBytes("fam2"),
|
||||||
|
Bytes.toBytes("fam3"),
|
||||||
|
};
|
||||||
|
private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
|
||||||
|
Bytes.toBytes("q2"),
|
||||||
|
Bytes.toBytes("q3"),
|
||||||
|
};
|
||||||
|
private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
|
||||||
|
Bytes.toBytes("v2"),
|
||||||
|
Bytes.toBytes("v3"),
|
||||||
|
};
|
||||||
|
private static byte[] TEST_ROW = Bytes.toBytes("testRow");
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path dir;
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private Path hbaseRootDir;
|
||||||
|
private Path oldLogDir;
|
||||||
|
private Path logDir;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
|
||||||
|
SampleRegionWALObserver.class.getName());
|
||||||
|
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
SampleRegionWALObserver.class.getName());
|
||||||
|
conf.setBoolean("dfs.support.append", true);
|
||||||
|
conf.setInt("dfs.client.block.recovery.retries", 2);
|
||||||
|
conf.setInt("hbase.regionserver.flushlogentries", 1);
|
||||||
|
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000);
|
||||||
|
Path hbaseRootDir =
|
||||||
|
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
|
||||||
|
LOG.info("hbase.rootdir=" + hbaseRootDir);
|
||||||
|
conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void teardownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||||
|
//this.cluster = TEST_UTIL.getDFSCluster();
|
||||||
|
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
this.hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR));
|
||||||
|
this.dir = new Path(this.hbaseRootDir, TestWALCoprocessors.class.getName());
|
||||||
|
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||||
|
|
||||||
|
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
|
||||||
|
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test WAL write behavior with WALObserver. The coprocessor monitors
|
||||||
|
* a WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
|
||||||
|
* WALEdit.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWWALCoprocessorWriteToWAL() throws Exception {
|
||||||
|
HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
|
||||||
|
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
|
||||||
|
deleteDir(basedir);
|
||||||
|
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
|
||||||
|
|
||||||
|
HLog log = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
|
||||||
|
SampleRegionWALObserver cp = getCoprocessor(log);
|
||||||
|
|
||||||
|
// TEST_FAMILY[0] shall be removed from WALEdit.
|
||||||
|
// TEST_FAMILY[1] value shall be changed.
|
||||||
|
// TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
|
||||||
|
cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
|
||||||
|
TEST_FAMILY[1], TEST_QUALIFIER[1],
|
||||||
|
TEST_FAMILY[2], TEST_QUALIFIER[2]);
|
||||||
|
|
||||||
|
assertFalse(cp.isPreWALWriteCalled());
|
||||||
|
assertFalse(cp.isPostWALWriteCalled());
|
||||||
|
|
||||||
|
// TEST_FAMILY[2] is not in the put, however it shall be added by the tested
|
||||||
|
// coprocessor.
|
||||||
|
// Use a Put to create familyMap.
|
||||||
|
Put p = creatPutWith2Families(TEST_ROW);
|
||||||
|
|
||||||
|
Map<byte [], List<KeyValue>> familyMap = p.getFamilyMap();
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
addFamilyMapToWALEdit(familyMap, edit);
|
||||||
|
|
||||||
|
boolean foundFamily0 = false;
|
||||||
|
boolean foundFamily2 = false;
|
||||||
|
boolean modifiedFamily1 = false;
|
||||||
|
|
||||||
|
List<KeyValue> kvs = edit.getKeyValues();
|
||||||
|
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
|
||||||
|
foundFamily0 = true;
|
||||||
|
}
|
||||||
|
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
|
||||||
|
foundFamily2 = true;
|
||||||
|
}
|
||||||
|
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
|
||||||
|
if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
|
||||||
|
modifiedFamily1 = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(foundFamily0);
|
||||||
|
assertFalse(foundFamily2);
|
||||||
|
assertFalse(modifiedFamily1);
|
||||||
|
|
||||||
|
// it's where WAL write cp should occur.
|
||||||
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
log.append(hri, hri.getTableDesc().getName(), edit, now);
|
||||||
|
|
||||||
|
// the edit shall have been change now by the coprocessor.
|
||||||
|
foundFamily0 = false;
|
||||||
|
foundFamily2 = false;
|
||||||
|
modifiedFamily1 = false;
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
|
||||||
|
foundFamily0 = true;
|
||||||
|
}
|
||||||
|
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
|
||||||
|
foundFamily2 = true;
|
||||||
|
}
|
||||||
|
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
|
||||||
|
if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
|
||||||
|
modifiedFamily1 = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertFalse(foundFamily0);
|
||||||
|
assertTrue(foundFamily2);
|
||||||
|
assertTrue(modifiedFamily1);
|
||||||
|
|
||||||
|
assertTrue(cp.isPreWALWriteCalled());
|
||||||
|
assertTrue(cp.isPostWALWriteCalled());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test WAL replay behavior with WALObserver.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWALCoprocessorReplay() throws Exception {
|
||||||
|
// WAL replay is handled at HRegion::replayRecoveredEdits(), which is
|
||||||
|
// ultimately called by HRegion::initialize()
|
||||||
|
byte[] tableName = Bytes.toBytes("testWALCoprocessorReplay");
|
||||||
|
|
||||||
|
final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
|
||||||
|
final Path basedir = new Path(this.hbaseRootDir, Bytes.toString(tableName));
|
||||||
|
deleteDir(basedir);
|
||||||
|
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
|
||||||
|
|
||||||
|
//HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
|
||||||
|
HLog wal = createWAL(this.conf);
|
||||||
|
//Put p = creatPutWith2Families(TEST_ROW);
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
//addFamilyMapToWALEdit(p.getFamilyMap(), edit);
|
||||||
|
final int countPerFamily = 1000;
|
||||||
|
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
|
||||||
|
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
|
||||||
|
EnvironmentEdgeManager.getDelegate(), wal);
|
||||||
|
}
|
||||||
|
wal.append(hri, tableName, edit, now);
|
||||||
|
// sync to fs.
|
||||||
|
wal.sync();
|
||||||
|
|
||||||
|
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||||
|
User user = HBaseTestingUtility.getDifferentUser(newConf,
|
||||||
|
".replay.wal.secondtime");
|
||||||
|
user.runAs(new PrivilegedExceptionAction() {
|
||||||
|
public Object run() throws Exception {
|
||||||
|
runWALSplit(newConf);
|
||||||
|
FileSystem newFS = FileSystem.get(newConf);
|
||||||
|
// Make a new wal for new region open.
|
||||||
|
HLog wal2 = createWAL(newConf);
|
||||||
|
HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
|
||||||
|
newConf, hri, TEST_UTIL.getHBaseCluster().getRegionServer(0));
|
||||||
|
long seqid2 = region2.initialize();
|
||||||
|
|
||||||
|
SampleRegionWALObserver cp2 =
|
||||||
|
(SampleRegionWALObserver)region2.getCoprocessorHost().findCoprocessor(
|
||||||
|
SampleRegionWALObserver.class.getName());
|
||||||
|
// TODO: asserting here is problematic.
|
||||||
|
assertNotNull(cp2);
|
||||||
|
assertTrue(cp2.isPreWALRestoreCalled());
|
||||||
|
assertTrue(cp2.isPostWALRestoreCalled());
|
||||||
|
region2.close();
|
||||||
|
wal2.closeAndDelete();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Test to see CP loaded successfully or not. There is a duplication
|
||||||
|
* at TestHLog, but the purpose of that one is to see whether the loaded
|
||||||
|
* CP will impact existing HLog tests or not.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWALCoprocessorLoaded() throws Exception {
|
||||||
|
HLog log = new HLog(fs, dir, oldLogDir, conf);
|
||||||
|
assertNotNull(getCoprocessor(log));
|
||||||
|
}
|
||||||
|
|
||||||
|
private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
|
||||||
|
WALCoprocessorHost host = wal.getCoprocessorHost();
|
||||||
|
Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
|
||||||
|
return (SampleRegionWALObserver)c;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Creates an HRI around an HTD that has <code>tableName</code> and three
|
||||||
|
* column families named.
|
||||||
|
* @param tableName Name of table to use when we create HTableDescriptor.
|
||||||
|
*/
|
||||||
|
private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
|
||||||
|
for (int i = 0; i < TEST_FAMILY.length; i++ ) {
|
||||||
|
HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
|
||||||
|
htd.addFamily(a);
|
||||||
|
}
|
||||||
|
return new HRegionInfo(htd, null, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param p Directory to cleanup
|
||||||
|
*/
|
||||||
|
private void deleteDir(final Path p) throws IOException {
|
||||||
|
if (this.fs.exists(p)) {
|
||||||
|
if (!this.fs.delete(p, true)) {
|
||||||
|
throw new IOException("Failed remove of " + p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Put creatPutWith2Families(byte[] row) throws IOException {
|
||||||
|
Put p = new Put(row);
|
||||||
|
for (int i = 0; i < TEST_FAMILY.length-1; i++ ) {
|
||||||
|
p.add(TEST_FAMILY[i], TEST_QUALIFIER[i],
|
||||||
|
TEST_VALUE[i]);
|
||||||
|
}
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copied from HRegion.
|
||||||
|
*
|
||||||
|
* @param familyMap map of family->edits
|
||||||
|
* @param walEdit the destination entry to append into
|
||||||
|
*/
|
||||||
|
private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
|
||||||
|
WALEdit walEdit) {
|
||||||
|
for (List<KeyValue> edits : familyMap.values()) {
|
||||||
|
for (KeyValue kv : edits) {
|
||||||
|
walEdit.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private Path runWALSplit(final Configuration c) throws IOException {
|
||||||
|
FileSystem fs = FileSystem.get(c);
|
||||||
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
|
||||||
|
this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
|
||||||
|
List<Path> splits = logSplitter.splitLog();
|
||||||
|
// Split should generate only 1 file since there's only 1 region
|
||||||
|
assertEquals(1, splits.size());
|
||||||
|
// Make sure the file exists
|
||||||
|
assertTrue(fs.exists(splits.get(0)));
|
||||||
|
LOG.info("Split file=" + splits.get(0));
|
||||||
|
return splits.get(0);
|
||||||
|
}
|
||||||
|
private HLog createWAL(final Configuration c) throws IOException {
|
||||||
|
HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
|
||||||
|
return wal;
|
||||||
|
}
|
||||||
|
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
|
||||||
|
final byte [] rowName, final byte [] family,
|
||||||
|
final int count, EnvironmentEdge ee, final HLog wal)
|
||||||
|
throws IOException {
|
||||||
|
String familyStr = Bytes.toString(family);
|
||||||
|
for (int j = 0; j < count; j++) {
|
||||||
|
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
|
||||||
|
byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(rowName, family, qualifierBytes,
|
||||||
|
ee.currentTimeMillis(), columnBytes));
|
||||||
|
wal.append(hri, tableName, edit, ee.currentTimeMillis());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
@ -45,6 +46,9 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.Coprocessor;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
|
||||||
|
@ -107,6 +111,8 @@ public class TestHLog {
|
||||||
.setInt("ipc.client.connect.max.retries", 1);
|
.setInt("ipc.client.connect.max.retries", 1);
|
||||||
TEST_UTIL.getConfiguration().setInt(
|
TEST_UTIL.getConfiguration().setInt(
|
||||||
"dfs.client.block.recovery.retries", 1);
|
"dfs.client.block.recovery.retries", 1);
|
||||||
|
TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
|
||||||
|
SampleRegionWALObserver.class.getName());
|
||||||
TEST_UTIL.startMiniCluster(3);
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
|
||||||
conf = TEST_UTIL.getConfiguration();
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
@ -640,6 +646,18 @@ public class TestHLog {
|
||||||
assertEquals(0, log.getNumLogFiles());
|
assertEquals(0, log.getNumLogFiles());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A loaded WAL coprocessor won't break existing HLog test cases.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWALCoprocessorLoaded() throws Exception {
|
||||||
|
// test to see whether the coprocessor is loaded or not.
|
||||||
|
HLog log = new HLog(fs, dir, oldLogDir, conf);
|
||||||
|
WALCoprocessorHost host = log.getCoprocessorHost();
|
||||||
|
Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
|
||||||
|
assertNotNull(c);
|
||||||
|
}
|
||||||
|
|
||||||
private void addEdits(HLog log, HRegionInfo hri, byte [] tableName,
|
private void addEdits(HLog log, HRegionInfo hri, byte [] tableName,
|
||||||
int times) throws IOException {
|
int times) throws IOException {
|
||||||
final byte [] row = Bytes.toBytes("row");
|
final byte [] row = Bytes.toBytes("row");
|
||||||
|
|
Loading…
Reference in New Issue