diff --git a/hbase-examples/pom.xml b/hbase-examples/pom.xml index c72a874f9f3..9f32fec1de3 100644 --- a/hbase-examples/pom.xml +++ b/hbase-examples/pom.xml @@ -183,6 +183,18 @@ com.google.protobuf protobuf-java + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + com.github.stephenc.findbugs findbugs-annotations diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/DelegatingInternalScanner.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/DelegatingInternalScanner.java new file mode 100644 index 00000000000..f781a338725 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/DelegatingInternalScanner.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; + +/** + * A simple delegation for doing filtering on {@link InternalScanner}. + */ +public class DelegatingInternalScanner implements InternalScanner { + + protected final InternalScanner scanner; + + public DelegatingInternalScanner(InternalScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return scanner.next(result, scannerContext); + } + + @Override + public void close() throws IOException { + scanner.close(); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 7f2a906be81..f849c8653b4 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -1,4 +1,4 @@ -/* +/** * Copyright The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one or more @@ -20,54 +20,40 @@ package org.apache.hadoop.hbase.coprocessor.example; import java.io.IOException; import java.util.List; -import java.util.NavigableSet; import java.util.Optional; -import java.util.OptionalInt; +import java.util.OptionalLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.retry.RetryForever; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.IsolationLevel; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; /** - * This is an example showing how a RegionObserver could configured - * via ZooKeeper in order to control a Region compaction, flush, and scan policy. - * - * This also demonstrated the use of shared - * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. - * See {@link RegionCoprocessorEnvironment#getSharedData()}. - * - * This would be useful for an incremental backup tool, which would indicate the last - * time of a successful backup via ZK and instruct HBase to not delete data that was - * inserted since (based on wall clock time). - * - * This implements org.apache.zookeeper.Watcher directly instead of using - * {@link org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher}, - * because RegionObservers come and go and currently - * listeners registered with ZooKeeperWatcher cannot be removed. + * This is an example showing how a RegionObserver could configured via ZooKeeper in order to + * control a Region compaction, flush, and scan policy. This also demonstrated the use of shared + * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. See + * {@link RegionCoprocessorEnvironment#getSharedData()}. + *

+ * This would be useful for an incremental backup tool, which would indicate the last time of a + * successful backup via ZK and instruct HBase that to safely delete the data which has already been + * backup. */ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver { + @Override public Optional getRegionObserver() { return Optional.of(this); @@ -78,178 +64,138 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs public static final String ZK_SESSION_TIMEOUT_KEY = "ZooKeeperScanPolicyObserver.zookeeper.session.timeout"; public static final int ZK_SESSION_TIMEOUT_DEFAULT = 30 * 1000; // 30 secs - public static final String node = "/backup/example/lastbackup"; - public static final String zkkey = "ZK"; - private static final Log LOG = LogFactory.getLog(ZooKeeperScanPolicyObserver.class); + public static final String NODE = "/backup/example/lastbackup"; + private static final String ZKKEY = "ZK"; - private ZooKeeper zk = null; + private NodeCache cache; /** * Internal watcher that keep "data" up to date asynchronously. */ - private static class ZKWatcher implements Watcher { - private byte[] data = null; - private ZooKeeper zk; - private volatile boolean needSetup = true; - private volatile long lastSetupTry = 0; + private static final class ZKDataHolder { - public ZKWatcher(ZooKeeper zk) { - this.zk = zk; - // trigger the listening - getData(); + private final String ensemble; + + private final int sessionTimeout; + + private CuratorFramework client; + + private NodeCache cache; + + private int ref; + + public ZKDataHolder(String ensemble, int sessionTimeout) { + this.ensemble = ensemble; + this.sessionTimeout = sessionTimeout; } - /** - * Get the maintained data. In case of any ZK exceptions this will retry - * establishing the connection (but not more than twice/minute). - * - * getData is on the critical path, so make sure it is fast unless there is - * a problem (network partion, ZK ensemble down, etc) - * Make sure at most one (unlucky) thread retries and other threads don't pile up - * while that threads tries to recreate the connection. - * - * @return the last know version of the data - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION") - public byte[] getData() { - // try at most twice/minute - if (needSetup && EnvironmentEdgeManager.currentTime() > lastSetupTry + 30000) { - synchronized (this) { - // make sure only one thread tries to reconnect - if (needSetup) { - needSetup = false; - } else { - return data; - } - } - // do this without the lock held to avoid threads piling up on this lock, - // as it can take a while + private void create() throws Exception { + client = + CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout) + .retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build(); + client.start(); + cache = new NodeCache(client, NODE); + cache.start(true); + } + + private void close() { + if (cache != null) { try { - LOG.debug("Connecting to ZK"); - // record this attempt - lastSetupTry = EnvironmentEdgeManager.currentTime(); - if (zk.exists(node, false) != null) { - data = zk.getData(node, this, null); - LOG.debug("Read synchronously: "+(data == null ? "null" : Bytes.toLong(data))); - } else { - zk.exists(node, this); - } - } catch (Exception x) { - // try again if this fails - needSetup = true; + cache.close(); + } catch (IOException e) { + // should not happen + throw new AssertionError(e); + } + cache = null; + } + if (client != null) { + client.close(); + client = null; + } + } + + public synchronized NodeCache acquire() throws Exception { + if (ref == 0) { + try { + create(); + } catch (Exception e) { + close(); + throw e; } } - return data; + ref++; + return cache; } - @Override - public void process(WatchedEvent event) { - switch (event.getType()) { - case NodeDataChanged: - case NodeCreated: - try { - // get data and re-watch - data = zk.getData(node, this, null); - LOG.debug("Read asynchronously: " + (data == null ? "null" : Bytes.toLong(data))); - } catch (InterruptedException ix) { - } catch (KeeperException kx) { - needSetup = true; - } - break; - - case NodeDeleted: - try { - // just re-watch - zk.exists(node, this); - data = null; - } catch (InterruptedException ix) { - } catch (KeeperException kx) { - needSetup = true; - } - break; - - default: - // ignore + public synchronized void release() { + ref--; + if (ref == 0) { + close(); } } } @Override - public void start(CoprocessorEnvironment e) throws IOException { - RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e; - if (!re.getSharedData().containsKey(zkkey)) { - // there is a short race here - // in the worst case we create a watcher that will be notified once - String ensemble = re.getConfiguration().get(ZK_ENSEMBLE_KEY); - int sessionTimeout = re.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, - ZK_SESSION_TIMEOUT_DEFAULT); - this.zk = new ZooKeeper(ensemble, sessionTimeout, null); - re.getSharedData().putIfAbsent(zkkey, new ZKWatcher(zk)); + public void start(CoprocessorEnvironment env) throws IOException { + RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; + try { + this.cache = ((ZKDataHolder) renv.getSharedData().computeIfAbsent(ZKKEY, k -> { + String ensemble = renv.getConfiguration().get(ZK_ENSEMBLE_KEY); + int sessionTimeout = + renv.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); + return new ZKDataHolder(ensemble, sessionTimeout); + })).acquire(); + } catch (Exception e) { + throw new IOException(e); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { - if (this.zk != null) { - try { - this.zk.close(); - } catch (InterruptedException e) { - LOG.error("Excepion while closing the ZK connection!", e); - } - } + RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; + this.cache = null; + ((ZKDataHolder) renv.getSharedData().get(ZKKEY)).release(); } - protected ScanInfo getScanInfo(Store store, RegionCoprocessorEnvironment e) { - byte[] data = ((ZKWatcher) e.getSharedData().get(zkkey)).getData(); + private OptionalLong getExpireBefore() { + ChildData data = cache.getCurrentData(); if (data == null) { - return null; + return OptionalLong.empty(); } - ScanInfo oldSI = ((HStore) store).getScanInfo(); - if (oldSI.getTtl() == Long.MAX_VALUE) { - return null; + byte[] bytes = data.getData(); + if (bytes == null || bytes.length != Long.BYTES) { + return OptionalLong.empty(); } - long ttl = Math.max(EnvironmentEdgeManager.currentTime() - Bytes.toLong(data), oldSI.getTtl()); - return new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), ttl, - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + return OptionalLong.of(Bytes.toLong(bytes)); + } + + private InternalScanner wrap(InternalScanner scanner) { + OptionalLong optExpireBefore = getExpireBefore(); + if (!optExpireBefore.isPresent()) { + return scanner; + } + long expireBefore = optExpireBefore.getAsLong(); + return new DelegatingInternalScanner(scanner) { + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + boolean moreRows = scanner.next(result, scannerContext); + result.removeIf(c -> c.getTimestamp() < expireBefore); + return moreRows; + } + }; } @Override - public InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); - if (scanInfo == null) { - // take default action - return null; - } - return new StoreScanner((HStore) store, scanInfo, OptionalInt.empty(), scanners, - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner) throws IOException { + return wrap(scanner); } @Override - public InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); - if (scanInfo == null) { - // take default action - return null; - } - return new StoreScanner((HStore) store, scanInfo, OptionalInt.empty(), scanners, scanType, - store.getSmallestReadPoint(), earliestPutTs); - } - - @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPoint) - throws IOException { - ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); - if (scanInfo == null) { - // take default action - return null; - } - return new StoreScanner((HStore) store, scanInfo, scan, targetCols, - ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED)); + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return wrap(scanner); } } \ No newline at end of file diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java new file mode 100644 index 00000000000..2c40cbecec3 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestZooKeeperScanPolicyObserver { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName NAME = TableName.valueOf("TestCP"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static Table TABLE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(3); + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(NAME) + .addCoprocessor(ZooKeeperScanPolicyObserver.class.getName()) + .setValue(ZooKeeperScanPolicyObserver.ZK_ENSEMBLE_KEY, + "localhost:" + UTIL.getZkCluster().getClientPort()) + .setValue(ZooKeeperScanPolicyObserver.ZK_SESSION_TIMEOUT_KEY, "2000") + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build()); + TABLE = UTIL.getConnection().getTable(NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + if (TABLE != null) { + TABLE.close(); + } + UTIL.shutdownMiniCluster(); + } + + private void setExpireBefore(long time) + throws KeeperException, InterruptedException, IOException { + ZooKeeper zk = UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().getZooKeeper(); + if (zk.exists(ZooKeeperScanPolicyObserver.NODE, false) == null) { + zk.create(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } else { + zk.setData(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), -1); + } + } + + private void assertValueEquals(int start, int end) throws IOException { + for (int i = start; i < end; i++) { + assertEquals(i, + Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUALIFIER))); + } + } + + private void assertNotExists(int start, int end) throws IOException { + for (int i = start; i < end; i++) { + assertFalse(TABLE.exists(new Get(Bytes.toBytes(i)))); + } + } + + private void put(int start, int end, long ts) throws IOException { + for (int i = start; i < end; i++) { + TABLE.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, ts, Bytes.toBytes(i))); + } + } + + @Test + public void test() throws IOException, KeeperException, InterruptedException { + long now = System.currentTimeMillis(); + put(0, 100, now - 10000); + assertValueEquals(0, 100); + + setExpireBefore(now - 5000); + Thread.sleep(5000); + UTIL.getAdmin().flush(NAME); + assertNotExists(0, 100); + + put(0, 50, now - 1000); + UTIL.getAdmin().flush(NAME); + put(50, 100, now - 100); + UTIL.getAdmin().flush(NAME); + assertValueEquals(0, 100); + + setExpireBefore(now - 500); + Thread.sleep(5000); + UTIL.getAdmin().majorCompact(NAME); + UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getRegions(NAME).iterator().next() + .getStore(FAMILY).getStorefilesCount() == 1); + assertNotExists(0, 50); + assertValueEquals(50, 100); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index 21b033f0612..1e71bc85a09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -46,11 +46,13 @@ import org.apache.yetus.audience.InterfaceStability; public interface InternalScanner extends Closeable { /** * Grab the next row's worth of values. - * @param results return output array + * @param result return output array * @return true if more rows exist after this one, false if scanner is done * @throws IOException e */ - boolean next(List results) throws IOException; + default boolean next(List result) throws IOException { + return next(result, NoLimitScannerContext.getInstance()); + } /** * Grab the next row's worth of values. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 8073bfd537a..f26575d6c39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -140,14 +140,8 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner *

* This can ONLY be called when you are using Scanners that implement InternalScanner as well as * KeyValueScanner (a {@link StoreScanner}). - * @param result * @return true if more rows exist after this one, false if scanner is done */ - @Override - public boolean next(List result) throws IOException { - return next(result, NoLimitScannerContext.getInstance()); - } - @Override public boolean next(List result, ScannerContext scannerContext) throws IOException { if (this.current == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 588211c4e4d..dd77f7b1ed7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -503,11 +503,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return this.heap.seek(key); } - @Override - public boolean next(List outResult) throws IOException { - return next(outResult, NoLimitScannerContext.getInstance()); - } - /** * Get the next row of values from this Store. * @param outResult diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index e6c1da95f93..32058995884 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -427,10 +427,6 @@ public class TestRegionObserverInterface { InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, CompactionRequest request) { return new InternalScanner() { - @Override - public boolean next(List results) throws IOException { - return next(results, NoLimitScannerContext.getInstance()); - } @Override public boolean next(List results, ScannerContext scannerContext) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 6099381db0a..ce52a45927e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -139,11 +139,6 @@ public class TestRegionObserverScannerOpenHook { return false; } - @Override - public boolean next(List results) throws IOException { - return false; - } - @Override public void close() throws IOException {} }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java index 2acf1da757d..67a7519b6b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -192,16 +192,11 @@ public class TestCompactor { this.kvs = new ArrayList<>(Arrays.asList(kvs)); } - @Override - public boolean next(List results) throws IOException { - if (kvs.isEmpty()) return false; - results.add(kvs.remove(0)); - return !kvs.isEmpty(); - } - @Override public boolean next(List result, ScannerContext scannerContext) throws IOException { - return next(result); + if (kvs.isEmpty()) return false; + result.add(kvs.remove(0)); + return !kvs.isEmpty(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index f3cb2938252..9b6ed6645d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -808,17 +808,12 @@ public class TestStripeCompactionPolicy { this.kvs = new ArrayList<>(Arrays.asList(kvs)); } - @Override - public boolean next(List results) throws IOException { - if (kvs.isEmpty()) return false; - results.add(kvs.remove(0)); - return !kvs.isEmpty(); - } - @Override public boolean next(List result, ScannerContext scannerContext) throws IOException { - return next(result); + if (kvs.isEmpty()) return false; + result.add(kvs.remove(0)); + return !kvs.isEmpty(); } @Override diff --git a/pom.xml b/pom.xml index faca5113a44..cb8b11fe89d 100755 --- a/pom.xml +++ b/pom.xml @@ -2094,6 +2094,21 @@ + + org.apache.curator + curator-recipes + ${curator.version} + + + com.google.guava + guava + + + org.apache.zookeeper + zookeeper + + + org.apache.yetus audience-annotations