From a1a5f57bffc85b5e0585466fd05c84e593fee996 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 14 Mar 2014 23:20:14 +0000 Subject: [PATCH] HBASE-10656 high-scale-lib's Counter depends on Oracle (Sun) JRE, and also has some bug git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577759 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/hbase/util/Counter.java | 182 ++++++++++++++++++ .../apache/hadoop/hbase/util/TestCounter.java | 100 ++++++++++ hbase-server/pom.xml | 4 - .../apache/hadoop/hbase/ipc/RpcServer.java | 2 +- .../hbase/mapreduce/TableMapReduceUtil.java | 4 - .../hbase/metrics/ExactCounterMetric.java | 2 +- .../hbase/regionserver/CompactionTool.java | 3 - .../hadoop/hbase/regionserver/HRegion.java | 6 +- .../hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/MemStoreFlusher.java | 2 +- pom.xml | 5 - src/main/docbkx/developer.xml | 1 - 12 files changed, 289 insertions(+), 24 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/Counter.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCounter.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Counter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Counter.java new file mode 100644 index 00000000000..0dbb12a3645 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Counter.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * High scalable counter. Thread safe. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class Counter { + private static final int MAX_CELLS_LENGTH = 1 << 20; + private static final int SUFFERABLE_SPIN_COUNT = 2; + + private static class Cell { + // Pads are added around the value to avoid cache-line contention with + // another cell's value. The cache-line size is expected to be equal to or + // less than about 128 Bytes (= 64 Bits * 16). + + @SuppressWarnings("unused") + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + @SuppressWarnings("unused") + volatile long q0, q1, q2, q3, q4, q5, q6; + + static final AtomicLongFieldUpdater valueUpdater = + AtomicLongFieldUpdater.newUpdater(Cell.class, "value"); + + Cell() {} + + Cell(long initValue) { + value = initValue; + } + + long get() { + return value; + } + + boolean addAndIsCongested(long delta) { + for(int i = 0; i < SUFFERABLE_SPIN_COUNT; i++) { + if(add(delta)) { + return false; + } + } + + while(! add(delta)) {} + + return true; + } + + boolean add(long delta) { + long current = value; + return valueUpdater.compareAndSet(this, current, current + delta); + } + } + + private static class Container { + /** The length should be a power of 2. */ + final Cell[] cells; + + /** True if a new extended container is going to replace this. */ + final AtomicBoolean demoted = new AtomicBoolean(); + + Container(Cell cell) { + this(new Cell[] { cell }); + } + + /** + * @param cells the length should be a power of 2 + */ + Container(Cell[] cells) { + this.cells = cells; + } + } + + private final AtomicReference containerRef; + + public Counter() { + this(new Cell()); + } + + public Counter(long initValue) { + this(new Cell(initValue)); + } + + private Counter(Cell initCell) { + containerRef = new AtomicReference(new Container(initCell)); + } + + private static int hash() { + return (int) Thread.currentThread().getId(); + } + + public void add(long delta) { + Container container = containerRef.get(); + Cell[] cells = container.cells; + int index = hash() & (cells.length - 1); + Cell cell = cells[index]; + + if(cell.addAndIsCongested(delta) && cells.length < MAX_CELLS_LENGTH && + container.demoted.compareAndSet(false, true)) { + + if(containerRef.get() == container) { + Cell[] newCells = new Cell[cells.length * 2]; + System.arraycopy(cells, 0, newCells, 0, cells.length); + for(int i = cells.length; i < newCells.length; i++) { + newCells[i] = new Cell(); + // Fill all of the elements with instances. Creating a cell on demand + // and putting it into the array makes a concurrent problem about + // visibility or, in other words, happens-before relation, because + // each element of the array is not volatile so that you should + // establish the relation by some piggybacking. + } + containerRef.compareAndSet(container, new Container(newCells)); + } + } + } + + public void increment() { + add(1); + } + + public void decrement() { + add(-1); + } + + public void set(long value) { + containerRef.set(new Container(new Cell(value))); + } + + public long get() { + long sum = 0; + for(Cell cell : containerRef.get().cells) { + sum += cell.get(); + } + return sum; + } + + @Override + public String toString() { + Cell[] cells = containerRef.get().cells; + + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + long sum = 0; + + for(Cell cell : cells) { + long value = cell.get(); + sum += value; + if(min > value) { min = value; } + if(max < value) { max = value; } + } + + return new StringBuilder(100) + .append("[value=").append(sum) + .append(", cells=[length=").append(cells.length) + .append(", min=").append(min) + .append(", max=").append(max) + .append("]]").toString(); + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCounter.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCounter.java new file mode 100644 index 00000000000..a0f0552953d --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCounter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.MediumTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestCounter { + private static final int[] THREAD_COUNTS = {1, 10, 100}; + private static final int DATA_COUNT = 1000000; + + private interface Operation { + void execute(); + } + + @Test + public void testIncrement() throws Exception { + for(int threadCount : THREAD_COUNTS) { + final Counter counter = new Counter(); + + execute(new Operation() { + @Override + public void execute() { + counter.increment(); + } + }, threadCount); + + Assert.assertEquals(threadCount * (long)DATA_COUNT, counter.get()); + } + } + + @Test + public void testIncrementAndGet() throws Exception { + for(int threadCount: THREAD_COUNTS) { + final Counter counter = new Counter(); + + execute(new Operation() { + @Override + public void execute() { + counter.increment(); + counter.get(); + } + }, threadCount); + + Assert.assertEquals(threadCount * (long)DATA_COUNT, counter.get()); + } + } + + private static void execute(final Operation op, int threadCount) + throws InterruptedException { + + final CountDownLatch prepareLatch = new CountDownLatch(threadCount); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch endLatch = new CountDownLatch(threadCount); + + class OperationThread extends Thread { + @Override + public void run() { + try { + prepareLatch.countDown(); + startLatch.await(); + + for(int i=0; icommons-cli commons-cli - - com.github.stephenc.high-scale-lib - high-scale-lib - commons-io commons-io diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 15f9618f78f..46c8d11d109 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; @@ -109,7 +110,6 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.cliffc.high_scale_lib.Counter; import org.cloudera.htrace.TraceInfo; import org.codehaus.jackson.map.ObjectMapper; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index f76f149a5fe..e5706fb2f30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -67,7 +67,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; -import org.cliffc.high_scale_lib.Counter; import com.google.protobuf.InvalidProtocolBufferException; @@ -312,9 +311,6 @@ public class TableMapReduceUtil { HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f); job.getConfiguration().setFloat("hbase.bucketcache.size", 0f); - - // We would need even more libraries that hbase-server depends on - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java index 715043fb50f..a82131c90cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java @@ -27,11 +27,11 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.util.MetricsBase; import org.apache.hadoop.metrics.util.MetricsRegistry; -import org.cliffc.high_scale_lib.Counter; import com.google.common.collect.Lists; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java index 078d83cb5c4..93512d6e916 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -356,9 +356,6 @@ public class CompactionTool extends Configured implements Tool { // add dependencies (including HBase ones) TableMapReduceUtil.addDependencyJars(job); - // This job instantiates HRegions, which requires the Counter class from the high_scale library - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - org.cliffc.high_scale_lib.Counter.class); Path stagingDir = JobUtil.getStagingDir(conf); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d5f0839a576..1cec3788e7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompressionTest; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; @@ -140,7 +141,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; -import org.cliffc.high_scale_lib.Counter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -5357,9 +5357,9 @@ public class HRegion implements HeapSize { // , Writable{ // woefully out of date - currently missing: // 1 x HashMap - coprocessorServiceHandlers - // 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL, + // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL, // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount, - // writeRequestsCount, updatesBlockedMs + // writeRequestsCount // 1 x HRegion$WriteState - writestate // 1 x RegionCoprocessorHost - coprocessorHost // 1 x RegionSplitPolicy - splitPolicy diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 73bf86ba5bc..e5eec504a7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -209,6 +209,7 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; @@ -234,7 +235,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; -import org.cliffc.high_scale_lib.Counter; import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.ByteString; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 81bfc4f411e..1e22c46eda3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -49,9 +49,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; -import org.cliffc.high_scale_lib.Counter; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; +import org.apache.hadoop.hbase.util.Counter; import com.google.common.base.Preconditions; diff --git a/pom.xml b/pom.xml index 3114b73ea3d..f58dd326ab6 100644 --- a/pom.xml +++ b/pom.xml @@ -1132,11 +1132,6 @@ commons-cli ${commons-cli.version} - - com.github.stephenc.high-scale-lib - high-scale-lib - 1.1.1 - commons-codec commons-codec diff --git a/src/main/docbkx/developer.xml b/src/main/docbkx/developer.xml index f0e48a8445a..8d5167f4af4 100644 --- a/src/main/docbkx/developer.xml +++ b/src/main/docbkx/developer.xml @@ -123,7 +123,6 @@ git clone git://github.com/apache/hbase.git Description Resource Path Location Type The project cannot be built until build path errors are resolved hbase Unknown Java Problem Unbound classpath variable: 'M2_REPO/asm/asm/3.1/asm-3.1.jar' in project 'hbase' hbase Build path Build Path Problem -Unbound classpath variable: 'M2_REPO/com/github/stephenc/high-scale-lib/high-scale-lib/1.1.1/high-scale-lib-1.1.1.jar' in project 'hbase' hbase Build path Build Path Problem Unbound classpath variable: 'M2_REPO/com/google/guava/guava/r09/guava-r09.jar' in project 'hbase' hbase Build path Build Path Problem Unbound classpath variable: 'M2_REPO/com/google/protobuf/protobuf-java/2.3.0/protobuf-java-2.3.0.jar' in project 'hbase' hbase Build path Build Path Problem Unbound classpath variable: