diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java index f03b91513eb..4e31d226aef 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java @@ -35,6 +35,11 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.metrics.Counter; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.metrics.Timer; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; /** * An example coprocessor that collects some metrics to demonstrate the usage of exporting custom @@ -50,6 +55,8 @@ import org.apache.hadoop.hbase.metrics.Timer; public class ExampleRegionObserverWithMetrics implements RegionCoprocessor { private Counter preGetCounter; + private Counter flushCounter; + private Counter filesCompactedCounter; private Timer costlyOperationTimer; private ExampleRegionObserver observer; @@ -79,6 +86,30 @@ public class ExampleRegionObserverWithMetrics implements RegionCoprocessor { } } + @Override + public void postFlush( + ObserverContext c, + FlushLifeCycleTracker tracker) throws IOException { + flushCounter.increment(); + } + + @Override + public void postFlush( + ObserverContext c, Store store, StoreFile resultFile, + FlushLifeCycleTracker tracker) throws IOException { + flushCounter.increment(); + } + + @Override + public void postCompactSelection( + ObserverContext c, Store store, + List selected, CompactionLifeCycleTracker tracker, + CompactionRequest request) { + if (selected != null) { + filesCompactedCounter.increment(selected.size()); + } + } + private void performCostlyOperation() { try { // simulate the operation by sleeping. @@ -119,6 +150,17 @@ public class ExampleRegionObserverWithMetrics implements RegionCoprocessor { // Create a Timer to track execution times for the costly operation. costlyOperationTimer = registry.timer("costlyOperation"); } + + if (flushCounter == null) { + // Track the number of flushes that have completed + flushCounter = registry.counter("flushesCompleted"); + } + + if (filesCompactedCounter == null) { + // Track the number of files that were compacted (many files may be rewritten in a single + // compaction). + filesCompactedCounter = registry.counter("filesCompacted"); + } } } diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ScanModifyingObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ScanModifyingObserver.java new file mode 100644 index 00000000000..942315c0e36 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ScanModifyingObserver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A RegionObserver which modifies incoming Scan requests to include additional + * columns than what the user actually requested. + */ +public class ScanModifyingObserver implements RegionCoprocessor, RegionObserver { + + public static final String FAMILY_TO_ADD_KEY = "hbase.examples.coprocessor.scanmodifying.family"; + public static final String QUALIFIER_TO_ADD_KEY = + "hbase.examples.coprocessor.scanmodifying.qualifier"; + + private byte[] FAMILY_TO_ADD = null; + private byte[] QUALIFIER_TO_ADD = null; + + @Override + public void start( + @SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException { + RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; + FAMILY_TO_ADD = Bytes.toBytes(renv.getConfiguration().get(FAMILY_TO_ADD_KEY)); + QUALIFIER_TO_ADD = Bytes.toBytes(renv.getConfiguration().get(QUALIFIER_TO_ADD_KEY)); + } + + @Override + public Optional getRegionObserver() { + // Extremely important to be sure that the coprocessor is invoked as a RegionObserver + return Optional.of(this); + } + + @Override + public void preScannerOpen( + ObserverContext c, Scan scan) throws IOException { + // Add another family:qualifier + scan.addColumn(FAMILY_TO_ADD, QUALIFIER_TO_ADD); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ValueRewritingObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ValueRewritingObserver.java new file mode 100644 index 00000000000..863ea8947f7 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ValueRewritingObserver.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilder.DataType; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This RegionObserver replaces the values of Puts from one value to another on compaction. + */ +public class ValueRewritingObserver implements RegionObserver, RegionCoprocessor { + public static final String ORIGINAL_VALUE_KEY = + "hbase.examples.coprocessor.value.rewrite.orig"; + public static final String REPLACED_VALUE_KEY = + "hbase.examples.coprocessor.value.rewrite.replaced"; + + private byte[] sourceValue = null; + private byte[] replacedValue = null; + private Bytes.ByteArrayComparator comparator; + private CellBuilder cellBuilder; + + + @Override + public Optional getRegionObserver() { + // Extremely important to be sure that the coprocessor is invoked as a RegionObserver + return Optional.of(this); + } + + @Override + public void start( + @SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException { + RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; + sourceValue = Bytes.toBytes(renv.getConfiguration().get(ORIGINAL_VALUE_KEY)); + replacedValue = Bytes.toBytes(renv.getConfiguration().get(REPLACED_VALUE_KEY)); + comparator = new Bytes.ByteArrayComparator(); + cellBuilder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + } + + @Override + public InternalScanner preCompact( + ObserverContext c, Store store, + final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) { + InternalScanner modifyingScanner = new InternalScanner() { + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + boolean ret = scanner.next(result, scannerContext); + for (int i = 0; i < result.size(); i++) { + Cell c = result.get(i); + // Replace the Cell if the value is the one we're replacing + if (CellUtil.isPut(c) && + comparator.compare(CellUtil.cloneValue(c), sourceValue) == 0) { + try { + cellBuilder.setRow(CellUtil.copyRow(c)); + cellBuilder.setFamily(CellUtil.cloneFamily(c)); + cellBuilder.setQualifier(CellUtil.cloneQualifier(c)); + cellBuilder.setTimestamp(c.getTimestamp()); + cellBuilder.setType(DataType.Put); + // Make sure each cell gets a unique value + byte[] clonedValue = new byte[replacedValue.length]; + System.arraycopy(replacedValue, 0, clonedValue, 0, replacedValue.length); + cellBuilder.setValue(clonedValue); + result.set(i, cellBuilder.build()); + } finally { + cellBuilder.clear(); + } + } + } + return ret; + } + + @Override + public void close() throws IOException { + scanner.close(); + } + }; + + return modifyingScanner; + } +} diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestScanModifyingObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestScanModifyingObserver.java new file mode 100644 index 00000000000..d5d0ba1226a --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestScanModifyingObserver.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestScanModifyingObserver { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final TableName NAME = TableName.valueOf("TestScanModifications"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final ColumnFamilyDescriptor CFD = ColumnFamilyDescriptorBuilder + .newBuilder(FAMILY).build(); + private static final int NUM_ROWS = 5; + private static final byte[] EXPLICIT_QUAL = Bytes.toBytes("our_qualifier"); + private static final byte[] IMPLICIT_QUAL = Bytes.toBytes("their_qualifier"); + private static final byte[] EXPLICIT_VAL = Bytes.toBytes("provided"); + private static final byte[] IMPLICIT_VAL = Bytes.toBytes("implicit"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(NAME) + .addCoprocessor(ScanModifyingObserver.class.getName()) + .setValue(ScanModifyingObserver.FAMILY_TO_ADD_KEY, Bytes.toString(FAMILY)) + .setValue(ScanModifyingObserver.QUALIFIER_TO_ADD_KEY, Bytes.toString(IMPLICIT_QUAL)) + .addColumnFamily(CFD).build()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void writeData(Table t) throws IOException { + List puts = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + Put p = new Put(Bytes.toBytes(i + 1)); + p.addColumn(FAMILY, EXPLICIT_QUAL, EXPLICIT_VAL); + p.addColumn(FAMILY, IMPLICIT_QUAL, IMPLICIT_VAL); + puts.add(p); + } + t.put(puts); + } + + @Test + public void test() throws IOException { + try (Table t = UTIL.getConnection().getTable(NAME)) { + writeData(t); + + Scan s = new Scan(); + s.addColumn(FAMILY, EXPLICIT_QUAL); + + try (ResultScanner scanner = t.getScanner(s)) { + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertNotNull("The " + (i + 1) + "th result was unexpectedly null", result); + assertEquals(2, result.getFamilyMap(FAMILY).size()); + assertArrayEquals(Bytes.toBytes(i + 1), result.getRow()); + assertArrayEquals(EXPLICIT_VAL, result.getValue(FAMILY, EXPLICIT_QUAL)); + assertArrayEquals(IMPLICIT_VAL, result.getValue(FAMILY, IMPLICIT_QUAL)); + } + assertNull(scanner.next()); + } + } + } +} diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestValueReplacingCompaction.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestValueReplacingCompaction.java new file mode 100644 index 00000000000..206cdf64866 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestValueReplacingCompaction.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestValueReplacingCompaction { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final TableName NAME = TableName.valueOf("TestValueReplacement"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + private static final ColumnFamilyDescriptor CFD = ColumnFamilyDescriptorBuilder + .newBuilder(FAMILY).build(); + private static final int NUM_ROWS = 5; + private static final String value = "foo"; + private static final String replacedValue = "bar"; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(NAME) + .addCoprocessor(ValueRewritingObserver.class.getName()) + .setValue(ValueRewritingObserver.ORIGINAL_VALUE_KEY, value) + .setValue(ValueRewritingObserver.REPLACED_VALUE_KEY, replacedValue) + .addColumnFamily(CFD).build()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void writeData(Table t) throws IOException { + List puts = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + Put p = new Put(Bytes.toBytes(i + 1)); + p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value)); + puts.add(p); + } + t.put(puts); + } + + @Test + public void test() throws IOException, InterruptedException { + try (Table t = UTIL.getConnection().getTable(NAME)) { + writeData(t); + + // Flush the data + UTIL.flush(NAME); + // Issue a compaction + UTIL.compact(NAME, true); + + Scan s = new Scan(); + s.addColumn(FAMILY, QUALIFIER); + + try (ResultScanner scanner = t.getScanner(s)) { + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertNotNull("The " + (i + 1) + "th result was unexpectedly null", result); + assertEquals(1, result.getFamilyMap(FAMILY).size()); + assertArrayEquals(Bytes.toBytes(i + 1), result.getRow()); + assertArrayEquals(Bytes.toBytes(replacedValue), result.getValue(FAMILY, QUALIFIER)); + } + assertNull(scanner.next()); + } + } + } +}