HBASE-10656 high-scale-lib's Counter depends on Oracle (Sun) JRE, and also has some bug
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577759 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6bf39f5359
commit
a1a5f57bff
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* High scalable counter. Thread safe.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class Counter {
|
||||
private static final int MAX_CELLS_LENGTH = 1 << 20;
|
||||
private static final int SUFFERABLE_SPIN_COUNT = 2;
|
||||
|
||||
private static class Cell {
|
||||
// Pads are added around the value to avoid cache-line contention with
|
||||
// another cell's value. The cache-line size is expected to be equal to or
|
||||
// less than about 128 Bytes (= 64 Bits * 16).
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
volatile long p0, p1, p2, p3, p4, p5, p6;
|
||||
volatile long value;
|
||||
@SuppressWarnings("unused")
|
||||
volatile long q0, q1, q2, q3, q4, q5, q6;
|
||||
|
||||
static final AtomicLongFieldUpdater<Cell> valueUpdater =
|
||||
AtomicLongFieldUpdater.newUpdater(Cell.class, "value");
|
||||
|
||||
Cell() {}
|
||||
|
||||
Cell(long initValue) {
|
||||
value = initValue;
|
||||
}
|
||||
|
||||
long get() {
|
||||
return value;
|
||||
}
|
||||
|
||||
boolean addAndIsCongested(long delta) {
|
||||
for(int i = 0; i < SUFFERABLE_SPIN_COUNT; i++) {
|
||||
if(add(delta)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
while(! add(delta)) {}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean add(long delta) {
|
||||
long current = value;
|
||||
return valueUpdater.compareAndSet(this, current, current + delta);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Container {
|
||||
/** The length should be a power of 2. */
|
||||
final Cell[] cells;
|
||||
|
||||
/** True if a new extended container is going to replace this. */
|
||||
final AtomicBoolean demoted = new AtomicBoolean();
|
||||
|
||||
Container(Cell cell) {
|
||||
this(new Cell[] { cell });
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cells the length should be a power of 2
|
||||
*/
|
||||
Container(Cell[] cells) {
|
||||
this.cells = cells;
|
||||
}
|
||||
}
|
||||
|
||||
private final AtomicReference<Container> containerRef;
|
||||
|
||||
public Counter() {
|
||||
this(new Cell());
|
||||
}
|
||||
|
||||
public Counter(long initValue) {
|
||||
this(new Cell(initValue));
|
||||
}
|
||||
|
||||
private Counter(Cell initCell) {
|
||||
containerRef = new AtomicReference<Container>(new Container(initCell));
|
||||
}
|
||||
|
||||
private static int hash() {
|
||||
return (int) Thread.currentThread().getId();
|
||||
}
|
||||
|
||||
public void add(long delta) {
|
||||
Container container = containerRef.get();
|
||||
Cell[] cells = container.cells;
|
||||
int index = hash() & (cells.length - 1);
|
||||
Cell cell = cells[index];
|
||||
|
||||
if(cell.addAndIsCongested(delta) && cells.length < MAX_CELLS_LENGTH &&
|
||||
container.demoted.compareAndSet(false, true)) {
|
||||
|
||||
if(containerRef.get() == container) {
|
||||
Cell[] newCells = new Cell[cells.length * 2];
|
||||
System.arraycopy(cells, 0, newCells, 0, cells.length);
|
||||
for(int i = cells.length; i < newCells.length; i++) {
|
||||
newCells[i] = new Cell();
|
||||
// Fill all of the elements with instances. Creating a cell on demand
|
||||
// and putting it into the array makes a concurrent problem about
|
||||
// visibility or, in other words, happens-before relation, because
|
||||
// each element of the array is not volatile so that you should
|
||||
// establish the relation by some piggybacking.
|
||||
}
|
||||
containerRef.compareAndSet(container, new Container(newCells));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void increment() {
|
||||
add(1);
|
||||
}
|
||||
|
||||
public void decrement() {
|
||||
add(-1);
|
||||
}
|
||||
|
||||
public void set(long value) {
|
||||
containerRef.set(new Container(new Cell(value)));
|
||||
}
|
||||
|
||||
public long get() {
|
||||
long sum = 0;
|
||||
for(Cell cell : containerRef.get().cells) {
|
||||
sum += cell.get();
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
Cell[] cells = containerRef.get().cells;
|
||||
|
||||
long min = Long.MAX_VALUE;
|
||||
long max = Long.MIN_VALUE;
|
||||
long sum = 0;
|
||||
|
||||
for(Cell cell : cells) {
|
||||
long value = cell.get();
|
||||
sum += value;
|
||||
if(min > value) { min = value; }
|
||||
if(max < value) { max = value; }
|
||||
}
|
||||
|
||||
return new StringBuilder(100)
|
||||
.append("[value=").append(sum)
|
||||
.append(", cells=[length=").append(cells.length)
|
||||
.append(", min=").append(min)
|
||||
.append(", max=").append(max)
|
||||
.append("]]").toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestCounter {
|
||||
private static final int[] THREAD_COUNTS = {1, 10, 100};
|
||||
private static final int DATA_COUNT = 1000000;
|
||||
|
||||
private interface Operation {
|
||||
void execute();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() throws Exception {
|
||||
for(int threadCount : THREAD_COUNTS) {
|
||||
final Counter counter = new Counter();
|
||||
|
||||
execute(new Operation() {
|
||||
@Override
|
||||
public void execute() {
|
||||
counter.increment();
|
||||
}
|
||||
}, threadCount);
|
||||
|
||||
Assert.assertEquals(threadCount * (long)DATA_COUNT, counter.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementAndGet() throws Exception {
|
||||
for(int threadCount: THREAD_COUNTS) {
|
||||
final Counter counter = new Counter();
|
||||
|
||||
execute(new Operation() {
|
||||
@Override
|
||||
public void execute() {
|
||||
counter.increment();
|
||||
counter.get();
|
||||
}
|
||||
}, threadCount);
|
||||
|
||||
Assert.assertEquals(threadCount * (long)DATA_COUNT, counter.get());
|
||||
}
|
||||
}
|
||||
|
||||
private static void execute(final Operation op, int threadCount)
|
||||
throws InterruptedException {
|
||||
|
||||
final CountDownLatch prepareLatch = new CountDownLatch(threadCount);
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
final CountDownLatch endLatch = new CountDownLatch(threadCount);
|
||||
|
||||
class OperationThread extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
prepareLatch.countDown();
|
||||
startLatch.await();
|
||||
|
||||
for(int i=0; i<DATA_COUNT; i++) {
|
||||
op.execute();
|
||||
}
|
||||
|
||||
endLatch.countDown();
|
||||
|
||||
} catch(Exception e) {}
|
||||
}
|
||||
}
|
||||
|
||||
for(int j=0; j<threadCount; j++) {
|
||||
new OperationThread().start();
|
||||
}
|
||||
|
||||
prepareLatch.await();
|
||||
startLatch.countDown();
|
||||
endLatch.await();
|
||||
}
|
||||
}
|
|
@ -342,10 +342,6 @@
|
|||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stephenc.high-scale-lib</groupId>
|
||||
<artifactId>high-scale-lib</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
|
|
|
@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.security.SaslUtil;
|
|||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Counter;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
|
@ -109,7 +110,6 @@ import org.apache.hadoop.security.token.SecretManager;
|
|||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
import org.cloudera.htrace.TraceInfo;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
|
|
|
@ -67,7 +67,6 @@ import org.apache.hadoop.mapreduce.Job;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
|
@ -312,9 +311,6 @@ public class TableMapReduceUtil {
|
|||
HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
|
||||
job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f);
|
||||
job.getConfiguration().setFloat("hbase.bucketcache.size", 0f);
|
||||
|
||||
// We would need even more libraries that hbase-server depends on
|
||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,11 +27,11 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Counter;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.metrics.MetricsRecord;
|
||||
import org.apache.hadoop.metrics.util.MetricsBase;
|
||||
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
|
|
@ -356,9 +356,6 @@ public class CompactionTool extends Configured implements Tool {
|
|||
|
||||
// add dependencies (including HBase ones)
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
// This job instantiates HRegions, which requires the Counter class from the high_scale library
|
||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
||||
org.cliffc.high_scale_lib.Counter.class);
|
||||
|
||||
Path stagingDir = JobUtil.getStagingDir(conf);
|
||||
try {
|
||||
|
|
|
@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||
import org.apache.hadoop.hbase.util.Counter;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HashedBytes;
|
||||
|
@ -140,7 +141,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -5357,9 +5357,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// woefully out of date - currently missing:
|
||||
// 1 x HashMap - coprocessorServiceHandlers
|
||||
// 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
|
||||
// 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
|
||||
// checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
|
||||
// writeRequestsCount, updatesBlockedMs
|
||||
// writeRequestsCount
|
||||
// 1 x HRegion$WriteState - writestate
|
||||
// 1 x RegionCoprocessorHost - coprocessorHost
|
||||
// 1 x RegionSplitPolicy - splitPolicy
|
||||
|
|
|
@ -209,6 +209,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
|||
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||
import org.apache.hadoop.hbase.util.Counter;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -234,7 +235,6 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
|
|
@ -49,9 +49,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
import org.cloudera.htrace.Trace;
|
||||
import org.cloudera.htrace.TraceScope;
|
||||
import org.apache.hadoop.hbase.util.Counter;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -1132,11 +1132,6 @@
|
|||
<artifactId>commons-cli</artifactId>
|
||||
<version>${commons-cli.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stephenc.high-scale-lib</groupId>
|
||||
<artifactId>high-scale-lib</artifactId>
|
||||
<version>1.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
|
|
|
@ -123,7 +123,6 @@ git clone git://github.com/apache/hbase.git
|
|||
Description Resource Path Location Type
|
||||
The project cannot be built until build path errors are resolved hbase Unknown Java Problem
|
||||
Unbound classpath variable: 'M2_REPO/asm/asm/3.1/asm-3.1.jar' in project 'hbase' hbase Build path Build Path Problem
|
||||
Unbound classpath variable: 'M2_REPO/com/github/stephenc/high-scale-lib/high-scale-lib/1.1.1/high-scale-lib-1.1.1.jar' in project 'hbase' hbase Build path Build Path Problem
|
||||
Unbound classpath variable: 'M2_REPO/com/google/guava/guava/r09/guava-r09.jar' in project 'hbase' hbase Build path Build Path Problem
|
||||
Unbound classpath variable: 'M2_REPO/com/google/protobuf/protobuf-java/2.3.0/protobuf-java-2.3.0.jar' in project 'hbase' hbase Build path Build Path Problem Unbound classpath variable:
|
||||
</programlisting>
|
||||
|
|
Loading…
Reference in New Issue