HBASE-4070 Improve region server metrics to report loaded coprocessors to master

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1188442 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2011-10-24 23:18:04 +00:00
parent 95a8d2bfc4
commit ed76ba8de3
10 changed files with 366 additions and 17 deletions

View File

@ -395,6 +395,8 @@ Release 0.92.0 - Unreleased
HBASE-4642 Add Apache License Header HBASE-4642 Add Apache License Header
HBASE-4591 TTL for old HLogs should be calculated from last modification time. HBASE-4591 TTL for old HLogs should be calculated from last modification time.
HBASE-4578 NPE when altering a table that has moving regions (gaojinchao) HBASE-4578 NPE when altering a table that has moving regions (gaojinchao)
HBASE-4070 Improve region server metrics to report loaded coprocessors to
master (Eugene Koontz via apurtell)
TESTS TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -106,6 +106,12 @@ org.apache.hadoop.hbase.HBaseConfiguration;
<tr><td>Fragmentation</td><td><% frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %></td><td>Overall fragmentation of all tables, including .META. and -ROOT-.</td></tr> <tr><td>Fragmentation</td><td><% frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %></td><td>Overall fragmentation of all tables, including .META. and -ROOT-.</td></tr>
</%if> </%if>
<tr><td>Zookeeper Quorum</td><td><% master.getZooKeeperWatcher().getQuorum() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr> <tr><td>Zookeeper Quorum</td><td><% master.getZooKeeperWatcher().getQuorum() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
<tr>
<td>
Coprocessors</td><td><% java.util.Arrays.toString(master.getCoprocessors()) %>
</td>
<td>Coprocessors currently loaded loaded by the master</td>
</tr>
</table> </table>
<& ../common/TaskMonitorTmpl; filter = filter &> <& ../common/TaskMonitorTmpl; filter = filter &>

View File

@ -84,6 +84,13 @@ org.apache.hadoop.hbase.HBaseConfiguration;
<tr><td>HBase Compiled</td><td><% org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <% org.apache.hadoop.hbase.util.VersionInfo.getUser() %></td><td>When HBase version was compiled and by whom</td></tr> <tr><td>HBase Compiled</td><td><% org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <% org.apache.hadoop.hbase.util.VersionInfo.getUser() %></td><td>When HBase version was compiled and by whom</td></tr>
<tr><td>Metrics</td><td><% metrics.toString() %></td><td>RegionServer Metrics; file and heap sizes are in megabytes</td></tr> <tr><td>Metrics</td><td><% metrics.toString() %></td><td>RegionServer Metrics; file and heap sizes are in megabytes</td></tr>
<tr><td>Zookeeper Quorum</td><td><% regionServer.getZooKeeper().getQuorum() %></td><td>Addresses of all registered ZK servers</td></tr> <tr><td>Zookeeper Quorum</td><td><% regionServer.getZooKeeper().getQuorum() %></td><td>Addresses of all registered ZK servers</td></tr>
<tr>
<td>Coprocessors</td>
<td>
<% java.util.Arrays.toString(regionServer.getCoprocessors()) %>
</td>
<td>Coprocessors currently loaded by this regionserver</td>
</tr>
</table> </table>
<& ../common/TaskMonitorTmpl; filter = filter &> <& ../common/TaskMonitorTmpl; filter = filter &>

View File

@ -24,6 +24,7 @@ import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -66,6 +67,7 @@ public class ClusterStatus extends VersionedWritable {
private Collection<ServerName> deadServers; private Collection<ServerName> deadServers;
private Map<String, RegionState> intransition; private Map<String, RegionState> intransition;
private String clusterId; private String clusterId;
private String[] masterCoprocessors;
/** /**
* Constructor, for Writable * Constructor, for Writable
@ -76,12 +78,14 @@ public class ClusterStatus extends VersionedWritable {
public ClusterStatus(final String hbaseVersion, final String clusterid, public ClusterStatus(final String hbaseVersion, final String clusterid,
final Map<ServerName, HServerLoad> servers, final Map<ServerName, HServerLoad> servers,
final Collection<ServerName> deadServers, final Map<String, RegionState> rit) { final Collection<ServerName> deadServers, final Map<String, RegionState> rit,
final String[] masterCoprocessors) {
this.hbaseVersion = hbaseVersion; this.hbaseVersion = hbaseVersion;
this.liveServers = servers; this.liveServers = servers;
this.deadServers = deadServers; this.deadServers = deadServers;
this.intransition = rit; this.intransition = rit;
this.clusterId = clusterid; this.clusterId = clusterid;
this.masterCoprocessors = masterCoprocessors;
} }
/** /**
@ -155,7 +159,8 @@ public class ClusterStatus extends VersionedWritable {
return (getVersion() == ((ClusterStatus)o).getVersion()) && return (getVersion() == ((ClusterStatus)o).getVersion()) &&
getHBaseVersion().equals(((ClusterStatus)o).getHBaseVersion()) && getHBaseVersion().equals(((ClusterStatus)o).getHBaseVersion()) &&
this.liveServers.equals(((ClusterStatus)o).liveServers) && this.liveServers.equals(((ClusterStatus)o).liveServers) &&
deadServers.equals(((ClusterStatus)o).deadServers); deadServers.equals(((ClusterStatus)o).deadServers) &&
Arrays.equals(this.masterCoprocessors, ((ClusterStatus)o).masterCoprocessors);
} }
/** /**
@ -205,6 +210,10 @@ public class ClusterStatus extends VersionedWritable {
return clusterId; return clusterId;
} }
public String[] getMasterCoprocessors() {
return masterCoprocessors;
}
// //
// Writable // Writable
// //
@ -227,6 +236,10 @@ public class ClusterStatus extends VersionedWritable {
e.getValue().write(out); e.getValue().write(out);
} }
out.writeUTF(clusterId); out.writeUTF(clusterId);
out.writeInt(masterCoprocessors.length);
for(String masterCoprocessor: masterCoprocessors) {
out.writeUTF(masterCoprocessor);
}
} }
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
@ -254,5 +267,10 @@ public class ClusterStatus extends VersionedWritable {
this.intransition.put(key, regionState); this.intransition.put(key, regionState);
} }
this.clusterId = in.readUTF(); this.clusterId = in.readUTF();
int masterCoprocessorsLength = in.readInt();
masterCoprocessors = new String[masterCoprocessorsLength];
for(int i = 0; i < masterCoprocessorsLength; i++) {
masterCoprocessors[i] = in.readUTF();
}
} }
} }

View File

@ -22,9 +22,12 @@ package org.apache.hadoop.hbase;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Strings;
@ -55,6 +58,33 @@ implements WritableComparable<HServerLoad> {
/** the maximum allowable size of the heap, in MB */ /** the maximum allowable size of the heap, in MB */
private int maxHeapMB = 0; private int maxHeapMB = 0;
// Regionserver-level coprocessors, e.g., WALObserver implementations.
// Region-level coprocessors, on the other hand, are stored inside RegionLoad
// objects.
private Set<String> coprocessors =
new TreeSet<String>();
/**
* HBASE-4070: Improve region server metrics to report loaded coprocessors.
*
* @return Returns the set of all coprocessors on this
* regionserver, where this set is the union of the
* regionserver-level coprocessors on one hand, and all of the region-level
* coprocessors, on the other.
*
* We must iterate through all regions loaded on this regionserver to
* obtain all of the region-level coprocessors.
*/
public String[] getCoprocessors() {
TreeSet<String> returnValue = new TreeSet<String>(coprocessors);
for (Map.Entry<byte[], RegionLoad> rls: getRegionsLoad().entrySet()) {
for (String coprocessor: rls.getValue().getCoprocessors()) {
returnValue.add(coprocessor);
}
}
return returnValue.toArray(new String[0]);
}
/** per-region load metrics */ /** per-region load metrics */
private Map<byte[], RegionLoad> regionLoad = private Map<byte[], RegionLoad> regionLoad =
new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR); new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR);
@ -114,6 +144,10 @@ implements WritableComparable<HServerLoad> {
*/ */
private int totalStaticBloomSizeKB; private int totalStaticBloomSizeKB;
// Region-level coprocessors.
Set<String> coprocessors =
new TreeSet<String>();
/** /**
* Constructor, for Writable * Constructor, for Writable
*/ */
@ -133,6 +167,7 @@ implements WritableComparable<HServerLoad> {
* @param writeRequestsCount * @param writeRequestsCount
* @param totalCompactingKVs * @param totalCompactingKVs
* @param currentCompactedKVs * @param currentCompactedKVs
* @param coprocessors
*/ */
public RegionLoad(final byte[] name, final int stores, public RegionLoad(final byte[] name, final int stores,
final int storefiles, final int storeUncompressedSizeMB, final int storefiles, final int storeUncompressedSizeMB,
@ -141,7 +176,8 @@ implements WritableComparable<HServerLoad> {
final int rootIndexSizeKB, final int totalStaticIndexSizeKB, final int rootIndexSizeKB, final int totalStaticIndexSizeKB,
final int totalStaticBloomSizeKB, final int totalStaticBloomSizeKB,
final int readRequestsCount, final int writeRequestsCount, final int readRequestsCount, final int writeRequestsCount,
final long totalCompactingKVs, final long currentCompactedKVs) { final long totalCompactingKVs, final long currentCompactedKVs,
final Set<String> coprocessors) {
this.name = name; this.name = name;
this.stores = stores; this.stores = stores;
this.storefiles = storefiles; this.storefiles = storefiles;
@ -156,9 +192,13 @@ implements WritableComparable<HServerLoad> {
this.writeRequestsCount = writeRequestsCount; this.writeRequestsCount = writeRequestsCount;
this.totalCompactingKVs = totalCompactingKVs; this.totalCompactingKVs = totalCompactingKVs;
this.currentCompactedKVs = currentCompactedKVs; this.currentCompactedKVs = currentCompactedKVs;
this.coprocessors = coprocessors;
} }
// Getters // Getters
private String[] getCoprocessors() {
return coprocessors.toArray(new String[0]);
}
/** /**
* @return the region name * @return the region name
@ -332,6 +372,11 @@ implements WritableComparable<HServerLoad> {
this.totalStaticBloomSizeKB = in.readInt(); this.totalStaticBloomSizeKB = in.readInt();
this.totalCompactingKVs = in.readLong(); this.totalCompactingKVs = in.readLong();
this.currentCompactedKVs = in.readLong(); this.currentCompactedKVs = in.readLong();
int coprocessorsSize = in.readInt();
coprocessors = new TreeSet<String>();
for (int i = 0; i < coprocessorsSize; i++) {
coprocessors.add(in.readUTF());
}
} }
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
@ -352,6 +397,10 @@ implements WritableComparable<HServerLoad> {
out.writeInt(totalStaticBloomSizeKB); out.writeInt(totalStaticBloomSizeKB);
out.writeLong(totalCompactingKVs); out.writeLong(totalCompactingKVs);
out.writeLong(currentCompactedKVs); out.writeLong(currentCompactedKVs);
out.writeInt(coprocessors.size());
for (String coprocessor: coprocessors) {
out.writeUTF(coprocessor);
}
} }
/** /**
@ -397,6 +446,11 @@ implements WritableComparable<HServerLoad> {
} }
sb = Strings.appendKeyValue(sb, "compactionProgressPct", sb = Strings.appendKeyValue(sb, "compactionProgressPct",
compactionProgressPct); compactionProgressPct);
String coprocessors = Arrays.toString(getCoprocessors());
if (coprocessors != null) {
sb = Strings.appendKeyValue(sb, "coprocessors",
Arrays.toString(getCoprocessors()));
}
return sb.toString(); return sb.toString();
} }
} }
@ -424,15 +478,18 @@ implements WritableComparable<HServerLoad> {
* @param numberOfRequests * @param numberOfRequests
* @param usedHeapMB * @param usedHeapMB
* @param maxHeapMB * @param maxHeapMB
* @param coprocessors : coprocessors loaded at the regionserver-level
*/ */
public HServerLoad(final int totalNumberOfRequests, public HServerLoad(final int totalNumberOfRequests,
final int numberOfRequests, final int usedHeapMB, final int maxHeapMB, final int numberOfRequests, final int usedHeapMB, final int maxHeapMB,
final Map<byte[], RegionLoad> regionLoad) { final Map<byte[], RegionLoad> regionLoad,
final Set<String> coprocessors) {
this.numberOfRequests = numberOfRequests; this.numberOfRequests = numberOfRequests;
this.usedHeapMB = usedHeapMB; this.usedHeapMB = usedHeapMB;
this.maxHeapMB = maxHeapMB; this.maxHeapMB = maxHeapMB;
this.regionLoad = regionLoad; this.regionLoad = regionLoad;
this.totalNumberOfRequests = totalNumberOfRequests; this.totalNumberOfRequests = totalNumberOfRequests;
this.coprocessors = coprocessors;
} }
/** /**
@ -441,7 +498,7 @@ implements WritableComparable<HServerLoad> {
*/ */
public HServerLoad(final HServerLoad hsl) { public HServerLoad(final HServerLoad hsl) {
this(hsl.totalNumberOfRequests, hsl.numberOfRequests, hsl.usedHeapMB, this(hsl.totalNumberOfRequests, hsl.numberOfRequests, hsl.usedHeapMB,
hsl.maxHeapMB, hsl.getRegionsLoad()); hsl.maxHeapMB, hsl.getRegionsLoad(), hsl.coprocessors);
for (Map.Entry<byte[], RegionLoad> e : hsl.regionLoad.entrySet()) { for (Map.Entry<byte[], RegionLoad> e : hsl.regionLoad.entrySet()) {
this.regionLoad.put(e.getKey(), e.getValue()); this.regionLoad.put(e.getKey(), e.getValue());
} }
@ -487,6 +544,10 @@ implements WritableComparable<HServerLoad> {
sb = Strings.appendKeyValue(sb, "usedHeapMB", sb = Strings.appendKeyValue(sb, "usedHeapMB",
Integer.valueOf(this.usedHeapMB)); Integer.valueOf(this.usedHeapMB));
sb = Strings.appendKeyValue(sb, "maxHeapMB", Integer.valueOf(maxHeapMB)); sb = Strings.appendKeyValue(sb, "maxHeapMB", Integer.valueOf(maxHeapMB));
String coprocessors = Arrays.toString(getCoprocessors());
if (coprocessors != null) {
sb = Strings.appendKeyValue(sb, "coprocessors", coprocessors);
}
return sb.toString(); return sb.toString();
} }
@ -607,6 +668,10 @@ implements WritableComparable<HServerLoad> {
regionLoad.put(rl.getName(), rl); regionLoad.put(rl.getName(), rl);
} }
totalNumberOfRequests = in.readInt(); totalNumberOfRequests = in.readInt();
int coprocessorsSize = in.readInt();
for(int i = 0; i < coprocessorsSize; i++) {
coprocessors.add(in.readUTF());
}
} }
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
@ -619,6 +684,10 @@ implements WritableComparable<HServerLoad> {
for (RegionLoad rl: regionLoad.values()) for (RegionLoad rl: regionLoad.values())
rl.write(out); rl.write(out);
out.writeInt(totalNumberOfRequests); out.writeInt(totalNumberOfRequests);
out.writeInt(coprocessors.size());
for (String coprocessor: coprocessors) {
out.writeUTF(coprocessor);
}
} }
// Comparable // Comparable

View File

@ -1647,4 +1647,13 @@ public class HBaseAdmin implements Abortable, Closeable {
sn.getHostname(), sn.getPort()); sn.getHostname(), sn.getPort());
return rs.rollHLogWriter(); return rs.rollHLogWriter();
} }
public String[] getMasterCoprocessors() {
try {
return getClusterStatus().getMasterCoprocessors();
} catch (IOException e) {
LOG.error("Could not getClusterStatus()",e);
return null;
}
}
} }

View File

@ -73,12 +73,35 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
pathPrefix = UUID.randomUUID().toString(); pathPrefix = UUID.randomUUID().toString();
} }
/**
* Not to be confused with the per-object _coprocessors_ (above),
* coprocessorNames is static and stores the set of all coprocessors ever
* loaded by any thread in this JVM. It is strictly additive: coprocessors are
* added to coprocessorNames, by loadInstance() but are never removed, since
* the intention is to preserve a history of all loaded coprocessors for
* diagnosis in case of server crash (HBASE-4014).
*/
private static Set<String> coprocessorNames = private static Set<String> coprocessorNames =
Collections.synchronizedSet(new HashSet<String>()); Collections.synchronizedSet(new HashSet<String>());
public static Set<String> getLoadedCoprocessors() { public static Set<String> getLoadedCoprocessors() {
return coprocessorNames; return coprocessorNames;
} }
/**
* Used to create a parameter to the HServerLoad constructor so that
* HServerLoad can provide information about the coprocessors loaded by this
* regionserver.
* (HBASE-4070: Improve region server metrics to report loaded coprocessors
* to master).
*/
public Set<String> getCoprocessors() {
Set<String> returnValue = new TreeSet<String>();
for(CoprocessorEnvironment e: coprocessors) {
returnValue.add(e.getInstance().getClass().getSimpleName());
}
return returnValue;
}
/** /**
* Load system coprocessors. Read the class names from configuration. * Load system coprocessors. Read the class names from configuration.
* Called by constructor. * Called by constructor.

View File

@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -345,7 +346,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
} }
/** /**
* Initilize all ZK based system trackers. * Initialize all ZK based system trackers.
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@ -1137,7 +1138,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.fileSystemManager.getClusterId(), this.fileSystemManager.getClusterId(),
this.serverManager.getOnlineServers(), this.serverManager.getOnlineServers(),
this.serverManager.getDeadServers(), this.serverManager.getDeadServers(),
this.assignmentManager.getRegionsInTransition()); this.assignmentManager.getRegionsInTransition(),
this.getCoprocessors());
} }
public String getClusterId() { public String getClusterId() {
@ -1155,6 +1157,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return CoprocessorHost.getLoadedCoprocessors().toString(); return CoprocessorHost.getLoadedCoprocessors().toString();
} }
/**
* @return array of coprocessor SimpleNames.
*/
public String[] getCoprocessors() {
Set<String> masterCoprocessors =
getCoprocessorHost().getCoprocessors();
return masterCoprocessors.toArray(new String[0]);
}
@Override @Override
public void abort(final String msg, final Throwable t) { public void abort(final String msg, final Throwable t) {
if (cpHost != null) { if (cpHost != null) {

View File

@ -810,7 +810,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
return new HServerLoad(requestCount.get(),(int)metrics.getRequests(), return new HServerLoad(requestCount.get(),(int)metrics.getRequests(),
(int)(memory.getUsed() / 1024 / 1024), (int)(memory.getUsed() / 1024 / 1024),
(int) (memory.getMax() / 1024 / 1024), regionLoads); (int) (memory.getMax() / 1024 / 1024), regionLoads,
this.hlog.getCoprocessorHost().getCoprocessors());
} }
String getOnlineRegionsAsPrintableString() { String getOnlineRegionsAsPrintableString() {
@ -1010,7 +1011,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB, storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
totalStaticIndexSizeKB, totalStaticBloomSizeKB, totalStaticIndexSizeKB, totalStaticBloomSizeKB,
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(), (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
totalCompactingKVs, currentCompactedKVs); totalCompactingKVs, currentCompactedKVs,
r.getCoprocessorHost().getCoprocessors());
} }
/** /**
@ -3349,4 +3351,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
HLog wal = this.getWAL(); HLog wal = this.getWAL();
return wal.rollWriter(true); return wal.rollWriter(true);
} }
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() {
HServerLoad hsl = buildServerLoad();
return hsl == null? null: hsl.getCoprocessors();
}
} }

View File

@ -25,12 +25,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -38,10 +41,12 @@ import org.apache.hadoop.fs.Path;
import javax.tools.*; import javax.tools.*;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import java.util.Arrays;
import java.util.jar.*; import java.util.jar.*;
import org.junit.*; import org.junit.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -63,10 +68,44 @@ public class TestClassLoading {
static final String cpName4 = "TestCP4"; static final String cpName4 = "TestCP4";
static final String cpName5 = "TestCP5"; static final String cpName5 = "TestCP5";
private static Class regionCoprocessor1 = ColumnAggregationEndpoint.class;
private static Class regionCoprocessor2 = GenericEndpoint.class;
private static Class regionServerCoprocessor = SampleRegionWALObserver.class;
private static Class masterCoprocessor = BaseMasterObserver.class;
private static final String[] regionServerSystemCoprocessors =
new String[]{
regionCoprocessor1.getSimpleName(),
regionServerCoprocessor.getSimpleName()
};
private static final String[] regionServerSystemAndUserCoprocessors =
new String[] {
regionCoprocessor1.getSimpleName(),
regionCoprocessor2.getSimpleName(),
regionServerCoprocessor.getSimpleName()
};
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
// regionCoprocessor1 will be loaded on all regionservers, since it is
// loaded for any tables (user or meta).
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
regionCoprocessor1.getName());
// regionCoprocessor2 will be loaded only on regionservers that serve a
// user table region. Therefore, if there are no user tables loaded,
// this coprocessor will not be loaded on any regionserver.
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
regionCoprocessor2.getName());
conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
regionServerCoprocessor.getName());
conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
masterCoprocessor.getName());
TEST_UTIL.startMiniCluster(1);
cluster = TEST_UTIL.getDFSCluster(); cluster = TEST_UTIL.getDFSCluster();
} }
@ -204,10 +243,11 @@ public class TestClassLoading {
admin.createTable(htd); admin.createTable(htd);
// verify that the coprocessors were loaded // verify that the coprocessors were loaded
boolean found1 = false, found2 = false, found2_k1 = false, found2_k2 = false, boolean found1 = false, found2 = false, found2_k1 = false,
found2_k3 = false; found2_k2 = false, found2_k3 = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { for (HRegion region:
hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionNameAsString().startsWith(tableName)) { if (region.getRegionNameAsString().startsWith(tableName)) {
CoprocessorEnvironment env; CoprocessorEnvironment env;
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1); env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
@ -247,7 +287,8 @@ public class TestClassLoading {
// verify that the coprocessor was loaded // verify that the coprocessor was loaded
boolean found = false; boolean found = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { for (HRegion region:
hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionNameAsString().startsWith(cpName3)) { if (region.getRegionNameAsString().startsWith(cpName3)) {
found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null); found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null);
} }
@ -310,7 +351,8 @@ public class TestClassLoading {
found5_k4 = false; found5_k4 = false;
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { for (HRegion region:
hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
if (region.getRegionNameAsString().startsWith(tableName)) { if (region.getRegionNameAsString().startsWith(tableName)) {
found_1 = found_1 || found_1 = found_1 ||
(region.getCoprocessorHost().findCoprocessor(cpName1) != null); (region.getCoprocessorHost().findCoprocessor(cpName1) != null);
@ -333,6 +375,7 @@ public class TestClassLoading {
} }
} }
} }
assertTrue("Class " + cpName1 + " was missing on a region", found_1); assertTrue("Class " + cpName1 + " was missing on a region", found_1);
assertTrue("Class " + cpName2 + " was missing on a region", found_2); assertTrue("Class " + cpName2 + " was missing on a region", found_2);
assertTrue("Class SimpleRegionObserver was missing on a region", found_3); assertTrue("Class SimpleRegionObserver was missing on a region", found_3);
@ -344,4 +387,157 @@ public class TestClassLoading {
assertTrue("Configuration key 'k3' was missing on a region", found5_k3); assertTrue("Configuration key 'k3' was missing on a region", found5_k3);
assertFalse("Configuration key 'k4' wasn't configured", found5_k4); assertFalse("Configuration key 'k4' wasn't configured", found5_k4);
} }
@Test
public void testRegionServerCoprocessorsReported() throws Exception {
// HBASE 4070: Improve region server metrics to report loaded coprocessors
// to master: verify that each regionserver is reporting the correct set of
// loaded coprocessors.
// We rely on the fact that getCoprocessors() will return a sorted
// display of the coprocessors' names, so for example, regionCoprocessor1's
// name "ColumnAggregationEndpoint" will appear before regionCoprocessor2's
// name "GenericEndpoint" because "C" is before "G" lexicographically.
HBaseAdmin admin = new HBaseAdmin(this.conf);
// disable all user tables, if any are loaded.
for (HTableDescriptor htd: admin.listTables()) {
if (!htd.isMetaTable()) {
String tableName = htd.getNameAsString();
if (admin.isTableEnabled(tableName)) {
try {
admin.disableTable(htd.getNameAsString());
} catch (TableNotEnabledException e) {
// ignoring this exception for now : not sure why it's happening.
}
}
}
}
// should only be system coprocessors loaded at this point.
assertAllRegionServers(regionServerSystemCoprocessors,null);
// The next two tests enable and disable user tables to see if coprocessor
// load reporting changes as coprocessors are loaded and unloaded.
//
// Create a table.
// should cause regionCoprocessor2 to be loaded, since we've specified it
// for loading on any user table with USER_REGION_COPROCESSOR_CONF_KEY
// in setUpBeforeClass().
String userTable1 = "userTable1";
HTableDescriptor userTD1 = new HTableDescriptor(userTable1);
admin.createTable(userTD1);
// table should be enabled now.
assertTrue(admin.isTableEnabled(userTable1));
assertAllRegionServers(regionServerSystemAndUserCoprocessors, userTable1);
// unload and make sure we're back to only system coprocessors again.
admin.disableTable(userTable1);
assertAllRegionServers(regionServerSystemCoprocessors,null);
// create another table, with its own specified coprocessor.
String userTable2 = "userTable2";
HTableDescriptor htd2 = new HTableDescriptor(userTable2);
String userTableCP = "userTableCP";
File jarFile1 = buildCoprocessorJar(userTableCP);
htd2.addFamily(new HColumnDescriptor("myfamily"));
htd2.setValue("COPROCESSOR$1", jarFile1.toString() + "|" + userTableCP +
"|" + Coprocessor.PRIORITY_USER);
admin.createTable(htd2);
// table should be enabled now.
assertTrue(admin.isTableEnabled(userTable2));
ArrayList<String> existingCPsPlusNew =
new ArrayList<String>(Arrays.asList(regionServerSystemAndUserCoprocessors));
existingCPsPlusNew.add(userTableCP);
String[] existingCPsPlusNewArray = new String[existingCPsPlusNew.size()];
assertAllRegionServers(existingCPsPlusNew.toArray(existingCPsPlusNewArray),
userTable2);
admin.disableTable(userTable2);
assertTrue(admin.isTableDisabled(userTable2));
// we should be back to only system coprocessors again.
assertAllRegionServers(regionServerSystemCoprocessors, null);
}
/**
* return the subset of all regionservers
* (actually returns set of HServerLoads)
* which host some region in a given table.
* used by assertAllRegionServers() below to
* test reporting of loaded coprocessors.
* @param tableName : given table.
* @return subset of all servers.
*/
Map<ServerName, HServerLoad> serversForTable(String tableName) {
Map<ServerName, HServerLoad> serverLoadHashMap =
new HashMap<ServerName, HServerLoad>();
for(Map.Entry<ServerName,HServerLoad> server:
TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().
getOnlineServers().entrySet()) {
for(Map.Entry<byte[], HServerLoad.RegionLoad> region:
server.getValue().getRegionsLoad().entrySet()) {
if (region.getValue().getNameAsString().equals(tableName)) {
// this server server hosts a region of tableName: add this server..
serverLoadHashMap.put(server.getKey(),server.getValue());
// .. and skip the rest of the regions that it hosts.
break;
}
}
}
return serverLoadHashMap;
}
void assertAllRegionServers(String[] expectedCoprocessors, String tableName)
throws InterruptedException {
Map<ServerName, HServerLoad> servers;
String[] actualCoprocessors = null;
boolean success = false;
for(int i = 0; i < 5; i++) {
if (tableName == null) {
//if no tableName specified, use all servers.
servers =
TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().
getOnlineServers();
} else {
servers = serversForTable(tableName);
}
boolean any_failed = false;
for(Map.Entry<ServerName,HServerLoad> server: servers.entrySet()) {
actualCoprocessors = server.getValue().getCoprocessors();
if (!Arrays.equals(actualCoprocessors, expectedCoprocessors)) {
LOG.debug("failed comparison: actual: " +
Arrays.toString(actualCoprocessors) +
" ; expected: " + Arrays.toString(expectedCoprocessors));
any_failed = true;
break;
}
}
if (any_failed == false) {
success = true;
break;
}
LOG.debug("retrying after failed comparison: " + i);
Thread.sleep(1000);
}
assertTrue(success);
}
@Test
public void testMasterCoprocessorsReported() {
// HBASE 4070: Improve region server metrics to report loaded coprocessors
// to master: verify that the master is reporting the correct set of
// loaded coprocessors.
final String loadedMasterCoprocessorsVerify =
"[" + masterCoprocessor.getSimpleName() + "]";
String loadedMasterCoprocessors =
java.util.Arrays.toString(
TEST_UTIL.getHBaseCluster().getMaster().getCoprocessors());
assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors);
}
} }