HBASE-6405 Create Hadoop compatibilty modules and Metrics2 implementation of replication metrics (Elliot Clark)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1362681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-07-17 22:02:06 +00:00
parent eeb9a275c8
commit c863b51baa
29 changed files with 1678 additions and 277 deletions

View File

@ -0,0 +1,63 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase</artifactId>
<groupId>org.apache.hbase</groupId>
<version>0.95-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>hbase-hadoop-compat</artifactId>
<name>HBase - Hadoop Compatibility</name>
<description>
Interfaces to be implemented in order to smooth
over hadoop version differences
</description>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<!-- Always skip the second part executions, since we only run
simple unit tests in this module -->
<executions>
<execution>
<id>secondPartTestsExecution</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
</dependencies>
</project>

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.metrics;
/**
* BaseMetricsSource for dynamic metrics to announce to Metrics2
*/
public interface BaseMetricsSource {
/**
* Set a gauge to a specific value.
*
* @param gaugeName the name of the gauge
* @param value the value
*/
public void setGauge(String gaugeName, long value);
/**
* Add some amount to a gauge.
*
* @param gaugeName the name of the gauge
* @param delta the amount to change the gauge by.
*/
public void incGauge(String gaugeName, long delta);
/**
* Subtract some amount from a gauge.
*
* @param gaugeName the name of the gauge
* @param delta the amount to change the gauge by.
*/
public void decGauge(String gaugeName, long delta);
/**
* Remove a gauge and no longer announce it.
*
* @param key Name of the gauge to remove.
*/
public void removeGauge(String key);
/**
* Add some amount to a counter.
*
* @param counterName the name of the counter
* @param delta the amount to change the counter by.
*/
public void incCounters(String counterName, long delta);
/**
* Remove a counter and stop announcing it to metrics2.
*
* @param key
*/
public void removeCounter(String key);
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.apache.hadoop.hbase.metrics.BaseMetricsSource;
/**
* Provides access to gauges and counters. Implementers will hide the details of hadoop1 or
* hadoop2's metrics2 classes and publishing.
*/
public interface ReplicationMetricsSource extends BaseMetricsSource {
//Empty interface so that ServiceLoader can find the right implementation.
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import java.util.ServiceLoader;
/**
* Class to load ReplicationMetricsSource from the class path. Will only return a singleton
* instance.
*/
public class ReplicationMetricsSourceFactory {
private static ReplicationMetricsSource rms = null;
public static final String EXCEPTION_STRING = "Could not create a Replication metrics source. " +
"Is the hadoop compatibility jar on the classpath?";
/**
* Get the singleton instance of ReplicationMetricsSource
*
* @return the singleton
*/
public static synchronized ReplicationMetricsSource getInstance() {
if (rms == null) {
try {
rms = ServiceLoader.load(ReplicationMetricsSource.class).iterator().next();
} catch (Exception e) {
throw new RuntimeException(EXCEPTION_STRING, e);
} catch (Error e) {
throw new RuntimeException(EXCEPTION_STRING, e);
}
// If there was nothing returned and no exception then throw an exception.
if (rms == null) {
throw new RuntimeException(EXCEPTION_STRING);
}
}
return rms;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.junit.Test;
/**
* Test for the ReplicationMetricsSourceFactory
*/
public class ReplicationMetricsSourceFactoryTest {
@Test(expected=RuntimeException.class)
public void testGetInstanceNoHadoopCompat() throws Exception {
//This should throw an exception because there is no compat lib on the class path.
ReplicationMetricsSourceFactory.getInstance();
}
}

View File

@ -0,0 +1,103 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase</artifactId>
<groupId>org.apache.hbase</groupId>
<version>0.95-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>hbase-hadoop1-compat</artifactId>
<name>HBase - Hadoop One Compatibility</name>
<description>
Interfaces to be implemented in order to smooth
over hadoop version differences
</description>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<!-- Always skip the second part executions, since we only run
simple unit tests in this module -->
<executions>
<execution>
<id>secondPartTestsExecution</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop-one.version}</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.kosmosfs</groupId>
<artifactId>kfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>oro</groupId>
<artifactId>oro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
<version>${hadoop-one.version}</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,201 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.metrics;
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Hadoop 1 implementation of BaseMetricsSource
*/
public class BaseMetricsSourceImpl implements BaseMetricsSource, MetricsSource {
private static boolean defaultMetricsSystemInited = false;
public static final String HBASE_METRICS_SYSTEM_NAME = "hbase";
public ConcurrentMap<String, MetricMutableGaugeLong>
gauges = new ConcurrentHashMap<String, MetricMutableGaugeLong>();
public ConcurrentMap<String, MetricMutableCounterLong> counters =
new ConcurrentHashMap<String, MetricMutableCounterLong>();
protected String metricsContext;
protected String metricsName;
protected String metricsDescription;
public BaseMetricsSourceImpl(
String metricsName,
String metricsDescription,
String metricsContext) {
this.metricsContext = metricsContext;
this.metricsName = metricsName;
this.metricsDescription = metricsDescription;
if (!defaultMetricsSystemInited) {
//Not too worried about mutli-threaded here as all it does is spam the logs.
defaultMetricsSystemInited = true;
DefaultMetricsSystem.initialize(HBASE_METRICS_SYSTEM_NAME);
}
//Register this instance.
DefaultMetricsSystem.registerSource(this.metricsContext, this.metricsDescription, this);
}
/**
* Set a single gauge to a value.
*
* @param gaugeName gauge name
* @param value the new value of the gauge.
*/
public void setGauge(String gaugeName, long value) {
MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, value);
gaugeInt.set(value);
}
/**
* Add some amount to a gauge.
*
* @param gaugeName The name of the gauge to increment.
* @param delta The amount to increment the gauge by.
*/
public void incGauge(String gaugeName, long delta) {
MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
gaugeInt.incr(delta);
}
/**
* Decrease the value of a named gauge.
*
* @param gaugeName The name of the gauge.
* @param delta the ammount to subtract from a gauge value.
*/
public void decGauge(String gaugeName, long delta) {
MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
gaugeInt.decr(delta);
}
/**
* Increment a named counter by some value.
*
* @param key the name of the counter
* @param delta the ammount to increment
*/
public void incCounters(String key, long delta) {
MetricMutableCounterLong counter = getLongCounter(key, 0l);
counter.incr(delta);
}
/**
* Remove a named gauge.
*
* @param key
*/
public void removeGauge(String key) {
gauges.remove(key);
}
/**
* Remove a named counter.
*
* @param key
*/
public void removeCounter(String key) {
counters.remove(key);
}
/**
* Method to export all the metrics.
*
* @param metricsBuilder Builder to accept metrics
* @param all push all or only changed?
*/
@Override
public void getMetrics(MetricsBuilder metricsBuilder, boolean all) {
MetricsRecordBuilder rb = metricsBuilder.addRecord(metricsName).setContext(metricsContext);
for (Map.Entry<String, MetricMutableCounterLong> entry : counters.entrySet()) {
entry.getValue().snapshot(rb, all);
}
for (Map.Entry<String, MetricMutableGaugeLong> entry : gauges.entrySet()) {
entry.getValue().snapshot(rb, all);
}
}
/**
* Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it.
*
* @param gaugeName name of the gauge to create or get.
* @param potentialStartingValue value of the new counter if we have to create it.
* @return
*/
private MetricMutableGaugeLong getLongGauge(String gaugeName, long potentialStartingValue) {
//Try and get the guage.
MetricMutableGaugeLong gauge = gauges.get(gaugeName);
//If it's not there then try and put a new one in the storage.
if (gauge == null) {
//Create the potential new gauge.
MetricMutableGaugeLong newGauge = new MetricMutableGaugeLong(gaugeName, "",
potentialStartingValue);
// Try and put the gauge in. This is atomic.
gauge = gauges.putIfAbsent(gaugeName, newGauge);
//If the value we get back is null then the put was successful and we will return that.
//otherwise gaugeLong should contain the thing that was in before the put could be completed.
if (gauge == null) {
gauge = newGauge;
}
}
return gauge;
}
/**
* Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
*
* @param counterName Name of the counter to get
* @param potentialStartingValue starting value if we have to create a new counter
* @return
*/
private MetricMutableCounterLong getLongCounter(String counterName, long potentialStartingValue) {
//See getLongGauge for description on how this works.
MetricMutableCounterLong counter = counters.get(counterName);
if (counter == null) {
MetricMutableCounterLong newCounter =
new MetricMutableCounterLong(counterName, "", potentialStartingValue);
counter = counters.putIfAbsent(counterName, newCounter);
if (counter == null) {
counter = newCounter;
}
}
return counter;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.apache.hadoop.hbase.metrics.BaseMetricsSourceImpl;
import org.apache.hadoop.metrics2.MetricsSource;
/**
* Hadoop1 implementation of ReplicationMetricsSource. This provides access to metrics gauges and
* counters.
*/
public class ReplicationMetricsSourceImpl extends BaseMetricsSourceImpl implements
ReplicationMetricsSource {
public static final String METRICS_NAME = "ReplicationMetrics";
public static final String METRICS_CONTEXT = "replicationmetrics";
public static final String METRICS_DESCRIPTION = "Metrics about HBase replication";
public ReplicationMetricsSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT);
}
ReplicationMetricsSourceImpl(String metricsName,
String metricsDescription, String metricsContext) {
super(metricsName, metricsDescription, metricsContext);
}
}

View File

@ -0,0 +1 @@
org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationMetricsSourceImpl

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.metrics;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
/**
* Test of the default BaseMetricsSource implementation for hadoop 1
*/
public class BaseMetricsSourceImplTest {
private static BaseMetricsSourceImpl bmsi;
@BeforeClass
public static void setUp() throws Exception {
bmsi = new BaseMetricsSourceImpl("TestName", "test description", "testcontext");
}
@Test
public void testSetGauge() throws Exception {
String key = "testset";
bmsi.setGauge(key, 100);
MetricMutableGaugeLong g = bmsi.gauges.get(key);
assertEquals(key, g.name);
bmsi.setGauge(key, 110);
assertSame(g, bmsi.gauges.get(key));
}
@Test
public void testIncGauge() throws Exception {
String key = "testincgauge";
bmsi.incGauge(key, 100);
MetricMutableGaugeLong g = bmsi.gauges.get(key);
assertEquals(key, g.name);
bmsi.incGauge(key, 10);
assertSame(g, bmsi.gauges.get(key));
}
@Test
public void testDecGauge() throws Exception {
String key = "testdec";
bmsi.decGauge(key, 100);
MetricMutableGaugeLong g = bmsi.gauges.get(key);
assertEquals(key, g.name);
bmsi.decGauge(key, 100);
assertSame(g, bmsi.gauges.get(key));
}
@Test
public void testIncCounters() throws Exception {
String key = "testinccounter";
bmsi.incCounters(key, 100);
MetricMutableCounterLong c = bmsi.counters.get(key);
assertEquals(key, c.name);
bmsi.incCounters(key, 100);
assertSame(c, bmsi.counters.get(key));
}
@Test
public void testRemoveGauge() throws Exception {
bmsi.setGauge("testrm", 100);
bmsi.removeGauge("testrm");
assertNull(bmsi.gauges.get("testrm"));
}
@Test
public void testRemoveCounter() throws Exception {
bmsi.incCounters("testrm", 100);
bmsi.removeCounter("testrm");
assertNull(bmsi.counters.get("testrm"));
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
/**
* Test to make sure that ReplicationMetricsSourceImpl is hooked up to ServiceLoader
*/
public class ReplicationMetricsSourceImplTest {
@Test
public void testGetInstance() throws Exception {
ReplicationMetricsSource rms = ReplicationMetricsSourceFactory.getInstance();
assertTrue(rms instanceof ReplicationMetricsSourceImpl);
}
}

View File

@ -0,0 +1,105 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase</artifactId>
<groupId>org.apache.hbase</groupId>
<version>0.95-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>hbase-hadoop2-compat</artifactId>
<name>HBase - Hadoop Two Compatibility</name>
<description>
Interfaces to be implemented in order to smooth
over hadoop version differences
</description>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<!-- Always skip the second part executions, since we only run
simple unit tests in this module -->
<executions>
<execution>
<id>secondPartTestsExecution</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>create-mrapp-generated-classpath</id>
<phase>generate-test-resources</phase>
<goals>
<goal>build-classpath</goal>
</goals>
<configuration>
<!-- needed to run the unit test for DS to generate
the required classpath that is required in the env
of the launch container in the mini mr/yarn cluster
-->
<outputFile>
${project.build.directory}/test-classes/mrapp-generated-classpath
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-two.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop-two.version}</version>
</dependency>
<!-- This was marked as test dep in earlier pom, but was scoped compile.
Where do we actually need it? -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop-two.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.metrics;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.HBaseMetricsFactory;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** Hadoop 2 implementation of BaseMetricsSource for */
public class BaseMetricsSourceImpl implements BaseMetricsSource, MetricsSource {
private static boolean defaultMetricsSystemInited = false;
public static final String HBASE_METRICS_SYSTEM_NAME = "hbase";
public ConcurrentMap<String, MutableGaugeLong>
gauges = new ConcurrentHashMap<String, MutableGaugeLong>();
public ConcurrentMap<String, MutableCounterLong> counters =
new ConcurrentHashMap<String, MutableCounterLong>();
protected String metricsContext;
protected String metricsName;
protected String metricsDescription;
public BaseMetricsSourceImpl(String metricsName,
String metricsDescription,
String metricsContext) {
this.metricsContext = metricsContext;
this.metricsName = metricsName;
this.metricsDescription = metricsDescription;
if (!defaultMetricsSystemInited) {
//Not too worried about mutlithread here as all it does is spam the logs.
defaultMetricsSystemInited = true;
DefaultMetricsSystem.initialize(HBASE_METRICS_SYSTEM_NAME);
}
DefaultMetricsSystem.instance().register(this.metricsContext, this.metricsDescription, this);
}
/**
* Set a single gauge to a value.
*
* @param gaugeName gauge name
* @param value the new value of the gauge.
*/
public void setGauge(String gaugeName, long value) {
MutableGaugeLong gaugeInt = getLongGauge(gaugeName, value);
gaugeInt.set(value);
}
/**
* Add some amount to a gauge.
*
* @param gaugeName The name of the gauge to increment.
* @param delta The amount to increment the gauge by.
*/
public void incGauge(String gaugeName, long delta) {
MutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
gaugeInt.incr(delta);
}
/**
* Decrease the value of a named gauge.
*
* @param gaugeName The name of the gauge.
* @param delta the ammount to subtract from a gauge value.
*/
public void decGauge(String gaugeName, long delta) {
MutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l);
gaugeInt.decr(delta);
}
/**
* Increment a named counter by some value.
*
* @param key the name of the counter
* @param delta the ammount to increment
*/
public void incCounters(String key, long delta) {
MutableCounterLong counter = getLongCounter(key, 0l);
counter.incr(delta);
}
/**
* Remove a named gauge.
*
* @param key
*/
public void removeGauge(String key) {
gauges.remove(key);
}
/**
* Remove a named counter.
*
* @param key
*/
public void removeCounter(String key) {
counters.remove(key);
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder rb =
metricsCollector.addRecord(this.metricsName).setContext(metricsContext);
for (Map.Entry<String, MutableCounterLong> entry : counters.entrySet()) {
entry.getValue().snapshot(rb, all);
}
for (Map.Entry<String, MutableGaugeLong> entry : gauges.entrySet()) {
entry.getValue().snapshot(rb, all);
}
}
/**
* Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it.
*
* @param gaugeName name of the gauge to create or get.
* @param potentialStartingValue value of the new counter if we have to create it.
* @return
*/
private MutableGaugeLong getLongGauge(String gaugeName, long potentialStartingValue) {
//Try and get the guage.
MutableGaugeLong gaugeInt = gauges.get(gaugeName);
//If it's not there then try and put a new one in the storage.
if (gaugeInt == null) {
//Create the potential new gauge.
MutableGaugeLong newGauge = HBaseMetricsFactory.newGauge(gaugeName,
"",
potentialStartingValue);
// Try and put the gauge in. This is atomic.
gaugeInt = gauges.putIfAbsent(gaugeName, newGauge);
//If the value we get back is null then the put was successful and we will return that.
//otherwise gaugeInt should contain the thing that was in before the put could be completed.
if (gaugeInt == null) {
gaugeInt = newGauge;
}
}
return gaugeInt;
}
/**
* Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
*
* @param counterName Name of the counter to get
* @param potentialStartingValue starting value if we have to create a new counter
* @return
*/
private MutableCounterLong getLongCounter(String counterName, long potentialStartingValue) {
//See getLongGauge for description on how this works.
MutableCounterLong counter = counters.get(counterName);
if (counter == null) {
MutableCounterLong newCounter =
HBaseMetricsFactory.newCounter(counterName, "", potentialStartingValue);
counter = counters.putIfAbsent(counterName, newCounter);
if (counter == null) {
counter = newCounter;
}
}
return counter;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.apache.hadoop.hbase.metrics.BaseMetricsSourceImpl;
import org.apache.hadoop.metrics2.MetricsSource;
/**
* Hadoop2 implementation of ReplicationMetricsSource. This provides access to metrics gauges and
* counters.
*/
public class ReplicationMetricsSourceImpl extends BaseMetricsSourceImpl implements
ReplicationMetricsSource {
public static final String METRICS_NAME = "ReplicationMetrics";
public static final String METRICS_CONTEXT = "replicationmetrics";
public static final String METRICS_DESCRIPTION = "Metrics about HBase replication";
public ReplicationMetricsSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT);
}
ReplicationMetricsSourceImpl(String metricsName,
String metricsDescription, String metricsContext) {
super(metricsName, metricsDescription, metricsContext);
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.lib;
/**
* Factory providing static methods to create MutableMetrics classes.
* HBase uses this class rather than MetricsRegistry because MetricsRegistry does not
* allow metrics to be removed.
*/
public class HBaseMetricsFactory {
/**
* Create a new gauge
* @param name Name of the gauge
* @param desc Description of the gauge
* @param startingValue The starting value
* @return a new MutableGaugeLong that has a starting value.
*/
public static MutableGaugeLong newGauge(String name, String desc, long startingValue) {
return new MutableGaugeLong(Interns.info(name, desc), startingValue);
}
/**
* Create a new counter.
* @param name Name of the counter.
* @param desc Description of the counter.
* @param startingValue The starting value.
* @return a new MutableCounterLong that has a starting value.
*/
public static MutableCounterLong newCounter(String name, String desc, long startingValue) {
return new MutableCounterLong(Interns.info(name, desc), startingValue);
}
}

View File

@ -0,0 +1 @@
org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationMetricsSourceImpl

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.metrics;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Test of default BaseMetricsSource for hadoop 2
*/
public class BaseMetricsSourceImplTest {
private static BaseMetricsSourceImpl bmsi;
@BeforeClass
public static void setUp() throws Exception {
bmsi = new BaseMetricsSourceImpl("TestName", "test description", "testcontext");
}
@Test
public void testSetGauge() throws Exception {
bmsi.setGauge("testset", 100);
assertEquals(100, bmsi.gauges.get("testset").value());
bmsi.setGauge("testset", 300);
assertEquals(300, bmsi.gauges.get("testset").value());
}
@Test
public void testIncGauge() throws Exception {
bmsi.incGauge("testincgauge", 100);
assertEquals(100, bmsi.gauges.get("testincgauge").value());
bmsi.incGauge("testincgauge", 100);
assertEquals(200, bmsi.gauges.get("testincgauge").value());
}
@Test
public void testDecGauge() throws Exception {
bmsi.decGauge("testdec", 100);
assertEquals(-100, bmsi.gauges.get("testdec").value());
bmsi.decGauge("testdec", 100);
assertEquals(-200, bmsi.gauges.get("testdec").value());
}
@Test
public void testIncCounters() throws Exception {
bmsi.incCounters("testinccounter", 100);
assertEquals(100, bmsi.counters.get("testinccounter").value());
bmsi.incCounters("testinccounter", 100);
assertEquals(200, bmsi.counters.get("testinccounter").value());
}
@Test
public void testRemoveGauge() throws Exception {
bmsi.setGauge("testrmgauge", 100);
bmsi.removeGauge("testrmgauge");
assertNull(bmsi.gauges.get("testrmgauge"));
}
@Test
public void testRemoveCounter() throws Exception {
bmsi.incCounters("testrmcounter", 100);
bmsi.removeCounter("testrmcounter");
assertNull(bmsi.counters.get("testrmcounter"));
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
/** Test for ReplicationMetricsSourceImpl */
public class ReplicationMetricsSourceImplTest {
@Test
public void testGetInstance() throws Exception {
ReplicationMetricsSource rms = ReplicationMetricsSourceFactory.getInstance();
assertTrue(rms instanceof ReplicationMetricsSourceImpl);
}
}

View File

@ -288,6 +288,15 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>${compat.module}</artifactId>
<version>${project.version}</version>
</dependency>
<!-- General dependencies -->
<dependency>
<groupId>io.netty</groupId>

View File

@ -19,13 +19,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -39,8 +32,16 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSinkMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
/**
* This class is responsible for replicating the edits coming
* from another cluster.
@ -133,7 +134,7 @@ public class ReplicationSink {
}
this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime());
this.metrics.appliedBatchesRate.inc(1);
this.metrics.applyBatch(entries.length);
LOG.info("Total replicated: " + totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
@ -173,7 +174,6 @@ public class ReplicationSink {
try {
table = this.pool.getTable(tableName);
table.batch(rows);
this.metrics.appliedOpsRate.inc(rows.size());
} catch (InterruptedException ix) {
throw new IOException(ix);
} finally {

View File

@ -1,82 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.MetricsRate;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.hadoop.metrics.util.MetricsIntValue;
import org.apache.hadoop.metrics.util.MetricsLongValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
/**
* This class is for maintaining the various replication statistics
* for a sink and publishing them through the metrics interfaces.
*/
@InterfaceAudience.Private
public class ReplicationSinkMetrics implements Updater {
private final MetricsRecord metricsRecord;
private MetricsRegistry registry = new MetricsRegistry();
/** Rate of operations applied by the sink */
public final MetricsRate appliedOpsRate =
new MetricsRate("appliedOpsRate", registry);
/** Rate of batches (of operations) applied by the sink */
public final MetricsRate appliedBatchesRate =
new MetricsRate("appliedBatchesRate", registry);
/** Age of the last operation that was applied by the sink */
private final MetricsLongValue ageOfLastAppliedOp =
new MetricsLongValue("ageOfLastAppliedOp", registry);
/**
* Constructor used to register the metrics
*/
public ReplicationSinkMetrics() {
MetricsContext context = MetricsUtil.getContext("hbase");
String name = Thread.currentThread().getName();
metricsRecord = MetricsUtil.createRecord(context, "replication");
metricsRecord.setTag("RegionServer", name);
context.registerUpdater(this);
// export for JMX
new ReplicationStatistics(this.registry, "ReplicationSink");
}
/**
* Set the age of the last edit that was applied
* @param timestamp write time of the edit
*/
public void setAgeOfLastAppliedOp(long timestamp) {
ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp);
}
@Override
public void doUpdates(MetricsContext metricsContext) {
synchronized (this) {
this.appliedOpsRate.pushMetric(this.metricsRecord);
this.appliedBatchesRate.pushMetric(this.metricsRecord);
this.ageOfLastAppliedOp.pushMetric(this.metricsRecord);
}
this.metricsRecord.update();
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSourceMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@ -238,7 +239,7 @@ public class ReplicationSource extends Thread
@Override
public void enqueueLog(Path log) {
this.queue.put(log);
this.metrics.sizeOfLogQueue.set(queue.size());
this.metrics.setSizeOfLogQueue(queue.size());
}
@Override
@ -246,6 +247,7 @@ public class ReplicationSource extends Thread
connectToPeers();
// We were stopped while looping to connect to sinks, just abort
if (!this.isActive()) {
metrics.clear();
return;
}
// delay this until we are in an asynchronous thread
@ -376,6 +378,7 @@ public class ReplicationSource extends Thread
}
}
LOG.debug("Source exiting " + peerId);
metrics.clear();
}
/**
@ -393,7 +396,7 @@ public class ReplicationSource extends Thread
HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.logEditsReadRate.inc(1);
this.metrics.incrLogEditsRead();
seenEntries++;
// Remove all KVs that should not be replicated
HLogKey logKey = entry.getKey();
@ -415,7 +418,7 @@ public class ReplicationSource extends Thread
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
} else {
this.metrics.logEditsFilteredRate.inc(1);
this.metrics.incrLogEditsFiltered();
}
}
// Stop if too many entries or too big
@ -455,7 +458,7 @@ public class ReplicationSource extends Thread
try {
if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
this.metrics.sizeOfLogQueue.set(queue.size());
this.metrics.setSizeOfLogQueue(queue.size());
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e);
@ -616,9 +619,7 @@ public class ReplicationSource extends Thread
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;
this.metrics.shippedBatchesRate.inc(1);
this.metrics.shippedOpsRate.inc(
this.currentNbOperations);
this.metrics.shipBatch(this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp(
this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
LOG.debug("Replicated in total: " + this.totalReplicatedEdits);

View File

@ -1,125 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.MetricsRate;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.hadoop.metrics.util.MetricsIntValue;
import org.apache.hadoop.metrics.util.MetricsLongValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
/**
* This class is for maintaining the various replication statistics
* for a source and publishing them through the metrics interfaces.
*/
@InterfaceAudience.Private
public class ReplicationSourceMetrics implements Updater {
private final MetricsRecord metricsRecord;
private MetricsRegistry registry = new MetricsRegistry();
/** Rate of shipped operations by the source */
public final MetricsRate shippedOpsRate =
new MetricsRate("shippedOpsRate", registry);
/** Rate of shipped batches by the source */
public final MetricsRate shippedBatchesRate =
new MetricsRate("shippedBatchesRate", registry);
/** Rate of log entries (can be multiple Puts) read from the logs */
public final MetricsRate logEditsReadRate =
new MetricsRate("logEditsReadRate", registry);
/** Rate of log entries filtered by the source */
public final MetricsRate logEditsFilteredRate =
new MetricsRate("logEditsFilteredRate", registry);
/** Age of the last operation that was shipped by the source */
private final MetricsLongValue ageOfLastShippedOp =
new MetricsLongValue("ageOfLastShippedOp", registry);
/**
* Current size of the queue of logs to replicate,
* excluding the one being processed at the moment
*/
public final MetricsIntValue sizeOfLogQueue =
new MetricsIntValue("sizeOfLogQueue", registry);
// It's a little dirty to preset the age to now since if we fail
// to replicate the very first time then it will show that age instead
// of nothing (although that might not be good either).
private long lastTimestampForAge = System.currentTimeMillis();
/**
* Constructor used to register the metrics
* @param id Name of the source this class is monitoring
*/
public ReplicationSourceMetrics(String id) {
MetricsContext context = MetricsUtil.getContext("hbase");
String name = Thread.currentThread().getName();
metricsRecord = MetricsUtil.createRecord(context, "replication");
metricsRecord.setTag("RegionServer", name);
context.registerUpdater(this);
try {
id = URLEncoder.encode(id, "UTF8");
} catch (UnsupportedEncodingException e) {
id = "CAN'T ENCODE UTF8";
}
// export for JMX
new ReplicationStatistics(this.registry, "ReplicationSource for " + id);
}
/**
* Set the age of the last edit that was shipped
* @param timestamp write time of the edit
*/
public void setAgeOfLastShippedOp(long timestamp) {
lastTimestampForAge = timestamp;
ageOfLastShippedOp.set(System.currentTimeMillis() - lastTimestampForAge);
}
/**
* Convenience method to use the last given timestamp to refresh the age
* of the last edit. Used when replication fails and need to keep that
* metric accurate.
*/
public void refreshAgeOfLastShippedOp() {
setAgeOfLastShippedOp(lastTimestampForAge);
}
@Override
public void doUpdates(MetricsContext metricsContext) {
synchronized (this) {
this.shippedOpsRate.pushMetric(this.metricsRecord);
this.shippedBatchesRate.pushMetric(this.metricsRecord);
this.logEditsReadRate.pushMetric(this.metricsRecord);
this.logEditsFilteredRate.pushMetric(this.metricsRecord);
this.ageOfLastShippedOp.pushMetric(this.metricsRecord);
this.sizeOfLogQueue.pushMetric(this.metricsRecord);
}
this.metricsRecord.update();
}
}

View File

@ -1,47 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.MetricsMBeanBase;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import javax.management.ObjectName;
/**
* Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean
* for JMX monitoring.
*/
@InterfaceAudience.Private
public class ReplicationStatistics extends MetricsMBeanBase {
private final ObjectName mbeanName;
/**
* Constructor to register the MBean
* @param registry which rehistry to use
* @param name name to get to this bean
*/
public ReplicationStatistics(MetricsRegistry registry, String name) {
super(registry, name);
mbeanName = MBeanUtil.registerMBean("Replication", name, this);
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationMetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationMetricsSourceFactory;
/**
* This class is for maintaining the various replication statistics for a sink and publishing them
* through the metrics interfaces.
*/
@InterfaceAudience.Private
public class ReplicationSinkMetrics {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
private ReplicationMetricsSource rms;
public ReplicationSinkMetrics() {
rms = ReplicationMetricsSourceFactory.getInstance();
}
/**
* Set the age of the last applied operation
*
* @param timestamp The timestamp of the last operation applied.
*/
public void setAgeOfLastAppliedOp(long timestamp) {
long age = System.currentTimeMillis() - timestamp;
rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age);
}
/**
* Convience method to change metrics when a batch of operations are applied.
*
* @param batchSize
*/
public void applyBatch(long batchSize) {
rms.incCounters(SINK_APPLIED_BATCHES, 1);
rms.incCounters(SINK_APPLIED_OPS, batchSize);
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.metrics;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This class is for maintaining the various replication statistics for a source and publishing them
* through the metrics interfaces.
*/
@InterfaceAudience.Private
public class ReplicationSourceMetrics {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
public static final Log LOG = LogFactory.getLog(ReplicationSourceMetrics.class);
private String id;
private long lastTimestamp = 0;
private int lastQueueSize = 0;
private String sizeOfLogQueKey;
private String ageOfLastShippedOpKey;
private String logEditsReadKey;
private String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
private ReplicationMetricsSource rms;
/**
* Constructor used to register the metrics
*
* @param id Name of the source this class is monitoring
*/
public ReplicationSourceMetrics(String id) {
this.id = id;
sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
logEditsReadKey = "source." + id + ".logEditsRead";
logEditsFilteredKey = "source." + id + ".logEditsFiltered";
shippedBatchesKey = "source." + this.id + ".shippedBatches";
shippedOpsKey = "source." + this.id + ".shippedOps";
rms = ReplicationMetricsSourceFactory.getInstance();
}
/**
* Set the age of the last edit that was shipped
*
* @param timestamp write time of the edit
*/
public void setAgeOfLastShippedOp(long timestamp) {
long age = System.currentTimeMillis() - timestamp;
rms.setGauge(ageOfLastShippedOpKey, age);
rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
this.lastTimestamp = timestamp;
}
/**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate.
*/
public void refreshAgeOfLastShippedOp() {
if (this.lastTimestamp > 0) {
setAgeOfLastShippedOp(this.lastTimestamp);
}
}
/**
* Set the size of the log queue
*
* @param size the size.
*/
public void setSizeOfLogQueue(int size) {
rms.setGauge(sizeOfLogQueKey, size);
rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
lastQueueSize = size;
}
/**
* Add on the the number of log edits read
*
* @param delta the number of log edits read.
*/
private void incrLogEditsRead(long delta) {
rms.incCounters(logEditsReadKey, delta);
rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
}
/** Increment the number of log edits read by one. */
public void incrLogEditsRead() {
incrLogEditsRead(1);
}
/**
* Add on the number of log edits filtered
*
* @param delta the number filtered.
*/
private void incrLogEditsFiltered(long delta) {
rms.incCounters(logEditsFilteredKey, delta);
rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
}
/** The number of log edits filtered out. */
public void incrLogEditsFiltered() {
incrLogEditsFiltered(1);
}
/**
* Convience method to apply changes to metrics do to shipping a batch of logs.
*
* @param batchSize the size of the batch that was shipped to sinks.
*/
public void shipBatch(long batchSize) {
rms.incCounters(shippedBatchesKey, 1);
rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
rms.incCounters(shippedOpsKey, batchSize);
rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
}
/** Removes all metrics about this Source. */
public void clear() {
rms.removeGauge(sizeOfLogQueKey);
rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
lastQueueSize = 0;
rms.removeGauge(ageOfLastShippedOpKey);
rms.removeCounter(logEditsFilteredKey);
rms.removeCounter(logEditsReadKey);
}
}

32
pom.xml
View File

@ -41,6 +41,9 @@
<url>http://hbase.apache.org</url>
<modules>
<module>hbase-server</module>
<module>hbase-hadoop2-compat</module>
<module>hbase-hadoop1-compat</module>
<module>hbase-hadoop-compat</module>
<module>hbase-common</module>
<module>hbase-it</module>
</modules>
@ -600,7 +603,7 @@
<tarLongFileMode>gnu</tarLongFileMode>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/assembly/all.xml</descriptor>
<descriptor>${assembly.file}</descriptor>
</descriptors>
</configuration>
</plugin>
@ -786,6 +789,8 @@
<compileSource>1.6</compileSource>
<hbase.version>${project.version}</hbase.version>
<!-- Dependencies -->
<hadoop-two.version>2.0.0-alpha</hadoop-two.version>
<hadoop-one.version>1.0.3</hadoop-one.version>
<avro.version>1.5.3</avro.version>
<commons-cli.version>1.2</commons-cli.version>
<commons-codec.version>1.4</commons-codec.version>
@ -837,6 +842,7 @@
<server.test.jar>hbase-server-${project.version}-tests.jar</server.test.jar>
<surefire.version>2.12-TRUNK-HBASE-2</surefire.version>
<surefire.provider>surefire-junit47</surefire.provider>
<compat.module>hbase-hadoop1-compat</compat.module>
<!-- default: run small & medium, medium with 2 threads -->
<surefire.skipFirstPart>false</surefire.skipFirstPart>
<surefire.skipSecondPart>false</surefire.skipSecondPart>
@ -865,6 +871,16 @@
<artifactId>hbase-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>${compat.module}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>hbase-server</artifactId>
<groupId>org.apache.hbase</groupId>
@ -1227,8 +1243,10 @@
</property>
</activation>
<properties>
<hadoop.version>1.0.3</hadoop.version>
<hadoop.version>${hadoop-one.version}</hadoop.version>
<slf4j.version>1.4.3</slf4j.version>
<compat.module>hbase-hadoop1-compat</compat.module>
<assembly.file>src/assembly/hadoop-one-compat.xml</assembly.file>
</properties>
<dependencyManagement>
<dependencies>
@ -1282,26 +1300,28 @@
</activation>
<properties>
<slf4j.version>1.6.1</slf4j.version>
<hadoop.version>2.0.0-alpha</hadoop.version>
<hadoop.version>${hadoop-two.version}</hadoop.version>
<compat.module>hbase-hadoop2-compat</compat.module>
<assembly.file>src/assembly/hadoop-two-compat.xml</assembly.file>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<version>${hadoop-two.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
<version>${hadoop-two.version}</version>
</dependency>
<!-- This was marked as test dep in earlier pom, but was scoped compile.
Where do we actually need it? -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<version>${hadoop-two.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -124,7 +124,10 @@
<!-- Now, select which projects to include in this module-set. -->
<!-- Just add future modules here assuming the wildcare doesn't match -->
<includes>
<include>org.apache.hbase:hbase-*</include>
<include>org.apache.hbase:hbase-common</include>
<include>org.apache.hbase:hbase-hadoop-compat</include>
<include>org.apache.hbase:hbase-hadoop1-compat</include>
<include>org.apache.hbase:hbase-server</include>
</includes>
<!-- Include all the sources in the top directory -->
<sources>

View File

@ -0,0 +1,156 @@
<?xml version="1.0"?>
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.1 http://maven.apache.org/xsd/assembly-1.1.1.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<!--This 'all' id is not appended to the produced bundle because
we do this: http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers -->
<id>all</id>
<formats>
<format>tar.gz</format>
</formats>
<fileSets>
<!--Copy over the site if built as docs dir-->
<fileSet>
<directory>target/site</directory>
<outputDirectory>docs</outputDirectory>
</fileSet>
<!-- Include top level text files -->
<fileSet>
<outputDirectory>/</outputDirectory>
<includes>
<include>*.txt</include>
<include>pom.xml</include>
</includes>
</fileSet>
<!-- Include the top level conf directory -->
<fileSet>
<directory>conf</directory>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!-- Include top level bin directory -->
<fileSet>
<directory>bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>conf</directory>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!--Include top-level src. Module src done down below-->
<fileSet>
<directory>src</directory>
<outputDirectory>src</outputDirectory>
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!-- Include dev-support directory -->
<fileSet>
<directory>dev-support</directory>
<outputDirectory>dev-support</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!-- Move the ruby code over -->
<fileSet>
<directory>hbase-server/src/main/ruby</directory>
<outputDirectory>lib/ruby</outputDirectory>
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!-- Move the webapps to the webapp dir -->
<fileSet>
<directory>hbase-server/target/hbase-webapps</directory>
<outputDirectory>hbase-webapps</outputDirectory>
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!-- Include native libraries -->
<fileSet>
<directory>hbase-server/target/native</directory>
<outputDirectory>native</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
<includes>
<include>*.so</include>
</includes>
</fileSet>
<!-- This is only necessary until maven fixes the intra-project dependency bug
in maven 3.0. Until then, we have to include the test jars for sub-projects. When
fixed, the below dependencySet stuff is sufficient for pulling in the test jars as
well, as long as they are added as dependencies in this project. Right now, we only
have 1 submodule to accumulate, but we can copy/paste as necessary until maven is
fixed. -->
<fileSet>
<directory>${parent.basedir}/hbase-server/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>${server.test.jar}</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
<moduleSets>
<moduleSet>
<!-- Enable access to all projects in the current multimodule build. Eclipse
says this is an error, but builds from the command line just fine. -->
<useAllReactorProjects>true</useAllReactorProjects>
<!-- This should work with more than 1 source module -->
<!-- Now, select which projects to include in this module-set. -->
<!-- Just add future modules here assuming the wildcare doesn't match -->
<includes>
<include>org.apache.hbase:hbase-common</include>
<include>org.apache.hbase:hbase-hadoop-compat</include>
<include>org.apache.hbase:hbase-hadoop2-compat</include>
<include>org.apache.hbase:hbase-server</include>
</includes>
<!-- Include all the sources in the top directory -->
<sources>
<fileSets>
<fileSet>
<excludes>
<exclude>target/</exclude>
<exclude>test/</exclude>
<exclude>.classpath</exclude>
<exclude>.project</exclude>
<exclude>.settings/</exclude>
</excludes>
</fileSet>
</fileSets>
</sources>
<!-- Binaries for the dependencies also go in the hbase-jars directory -->
<binaries>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
<dependencySets>
<dependencySet/>
</dependencySets>
</binaries>
</moduleSet>
</moduleSets>
</assembly>