Merge online snapshot branch with trunk 2/20/13
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290v2@1448318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
e9e10378c7
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
public class ReflectionUtils {
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T instantiateWithCustomCtor(String className,
|
||||
Class<? >[] ctorArgTypes, Object[] ctorArgs) {
|
||||
try {
|
||||
Class<? extends T> resultType = (Class<? extends T>) Class.forName(className);
|
||||
return resultType.getDeclaredConstructor(ctorArgTypes).newInstance(ctorArgs);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find " + className, e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to access specified class " + className, e);
|
||||
} catch (InstantiationException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to instantiate specified class " + className, e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Constructor threw an exception for " + className, e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unable to find suitable constructor for class " + className, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -95,6 +95,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(0, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -118,6 +119,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(0, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
|
||||
|
@ -147,6 +149,7 @@ public class TestBulkDeleteProtocol {
|
|||
for (BulkDeleteResponse response : result.values()) {
|
||||
noOfDeletedRows += response.getRowsDeleted();
|
||||
}
|
||||
ht.close();
|
||||
return noOfDeletedRows;
|
||||
}
|
||||
|
||||
|
@ -177,6 +180,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(90, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -205,6 +209,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(100, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -235,6 +240,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(100, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -282,6 +288,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(100, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -328,6 +335,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(100, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -412,6 +420,7 @@ public class TestBulkDeleteProtocol {
|
|||
rows++;
|
||||
}
|
||||
assertEquals(100, rows);
|
||||
ht.close();
|
||||
}
|
||||
|
||||
private HTable createTable(byte[] tableName) throws IOException {
|
||||
|
|
|
@ -85,12 +85,12 @@ public class DistributedHBaseCluster extends HBaseCluster {
|
|||
|
||||
@Override
|
||||
public AdminProtocol getAdminProtocol(ServerName serverName) throws IOException {
|
||||
return admin.getConnection().getAdmin(serverName.getHostname(), serverName.getPort());
|
||||
return admin.getConnection().getAdmin(serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientProtocol getClientProtocol(ServerName serverName) throws IOException {
|
||||
return admin.getConnection().getClient(serverName.getHostname(), serverName.getPort());
|
||||
return admin.getConnection().getClient(serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -193,7 +193,7 @@ public class DistributedHBaseCluster extends HBaseCluster {
|
|||
return null;
|
||||
}
|
||||
|
||||
AdminProtocol client = connection.getAdmin(regionLoc.getHostname(), regionLoc.getPort());
|
||||
AdminProtocol client = connection.getAdmin(regionLoc.getServerName());
|
||||
ServerInfo info = ProtobufUtil.getServerInfo(client);
|
||||
return ProtobufUtil.toServerName(info.getServerName());
|
||||
}
|
||||
|
|
|
@ -33,33 +33,19 @@ import org.apache.hadoop.hbase.util.Addressing;
|
|||
@InterfaceStability.Evolving
|
||||
public class HRegionLocation implements Comparable<HRegionLocation> {
|
||||
private final HRegionInfo regionInfo;
|
||||
private final String hostname;
|
||||
private final int port;
|
||||
private final ServerName serverName;
|
||||
private final long seqNum;
|
||||
// Cache of the 'toString' result.
|
||||
private String cachedString = null;
|
||||
// Cache of the hostname + port
|
||||
private String cachedHostnamePort;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param regionInfo the HRegionInfo for the region
|
||||
*/
|
||||
public HRegionLocation(HRegionInfo regionInfo, final String hostname,
|
||||
final int port, final long seqNum) {
|
||||
public HRegionLocation(HRegionInfo regionInfo, ServerName serverName, long seqNum) {
|
||||
this.regionInfo = regionInfo;
|
||||
this.hostname = hostname;
|
||||
this.port = port;
|
||||
this.serverName = serverName;
|
||||
this.seqNum = seqNum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test constructor w/o seqNum.
|
||||
*/
|
||||
public HRegionLocation(HRegionInfo regionInfo, final String hostname, final int port) {
|
||||
this(regionInfo, hostname, port, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.lang.Object#toString()
|
||||
*/
|
||||
|
@ -67,8 +53,7 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
|
|||
public synchronized String toString() {
|
||||
if (this.cachedString == null) {
|
||||
this.cachedString = "region=" + this.regionInfo.getRegionNameAsString() +
|
||||
", hostname=" + this.hostname + ", port=" + this.port
|
||||
+ ", seqNum=" + seqNum;
|
||||
", hostname=" + this.serverName + ", seqNum=" + seqNum;
|
||||
}
|
||||
return this.cachedString;
|
||||
}
|
||||
|
@ -95,9 +80,7 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
|
|||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = this.hostname.hashCode();
|
||||
result ^= this.port;
|
||||
return result;
|
||||
return this.serverName.hashCode();
|
||||
}
|
||||
|
||||
/** @return HRegionInfo */
|
||||
|
@ -106,11 +89,11 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
|
|||
}
|
||||
|
||||
public String getHostname() {
|
||||
return this.hostname;
|
||||
return this.serverName.getHostname();
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return this.port;
|
||||
return this.serverName.getPort();
|
||||
}
|
||||
|
||||
public long getSeqNum() {
|
||||
|
@ -123,18 +106,16 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
|
|||
public synchronized String getHostnamePort() {
|
||||
if (this.cachedHostnamePort == null) {
|
||||
this.cachedHostnamePort =
|
||||
Addressing.createHostAndPortStr(this.hostname, this.port);
|
||||
Addressing.createHostAndPortStr(this.getHostname(), this.getPort());
|
||||
}
|
||||
return this.cachedHostnamePort;
|
||||
}
|
||||
|
||||
//
|
||||
// Comparable
|
||||
//
|
||||
public ServerName getServerName() {
|
||||
return serverName;
|
||||
}
|
||||
|
||||
public int compareTo(HRegionLocation o) {
|
||||
int result = this.hostname.compareTo(o.getHostname());
|
||||
if (result != 0) return result;
|
||||
return this.port - o.getPort();
|
||||
return serverName.compareTo(o.getServerName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,17 +35,19 @@ public class RegionMovedException extends NotServingRegionException {
|
|||
|
||||
private final String hostname;
|
||||
private final int port;
|
||||
private final long startCode;
|
||||
private final long locationSeqNum;
|
||||
|
||||
private static final String HOST_FIELD = "hostname=";
|
||||
private static final String PORT_FIELD = "port=";
|
||||
private static final String STARTCODE_FIELD = "startCode=";
|
||||
private static final String LOCATIONSEQNUM_FIELD = "locationSeqNum=";
|
||||
|
||||
public RegionMovedException(final String hostname, final int port,
|
||||
final long locationSeqNum) {
|
||||
super();
|
||||
this.hostname = hostname;
|
||||
this.port = port;
|
||||
|
||||
public RegionMovedException(ServerName serverName, long locationSeqNum) {
|
||||
this.hostname = serverName.getHostname();
|
||||
this.port = serverName.getPort();
|
||||
this.startCode = serverName.getStartcode();
|
||||
this.locationSeqNum = locationSeqNum;
|
||||
}
|
||||
|
||||
|
@ -57,6 +59,10 @@ public class RegionMovedException extends NotServingRegionException {
|
|||
return port;
|
||||
}
|
||||
|
||||
public ServerName getServerName(){
|
||||
return new ServerName(hostname, port, startCode);
|
||||
}
|
||||
|
||||
public long getLocationSeqNum() {
|
||||
return locationSeqNum;
|
||||
}
|
||||
|
@ -69,22 +75,27 @@ public class RegionMovedException extends NotServingRegionException {
|
|||
public RegionMovedException(String s) {
|
||||
int posHostname = s.indexOf(HOST_FIELD) + HOST_FIELD.length();
|
||||
int posPort = s.indexOf(PORT_FIELD) + PORT_FIELD.length();
|
||||
int posStartCode = s.indexOf(STARTCODE_FIELD) + STARTCODE_FIELD.length();
|
||||
int posSeqNum = s.indexOf(LOCATIONSEQNUM_FIELD) + LOCATIONSEQNUM_FIELD.length();
|
||||
|
||||
String tmpHostname = null;
|
||||
int tmpPort = -1;
|
||||
long tmpStartCode = -1;
|
||||
long tmpSeqNum = HConstants.NO_SEQNUM;
|
||||
try {
|
||||
// TODO: this whole thing is extremely brittle.
|
||||
tmpHostname = s.substring(posHostname, s.indexOf(' ', posHostname));
|
||||
tmpPort = Integer.parseInt(s.substring(posPort, s.indexOf('.', posPort)));
|
||||
tmpPort = Integer.parseInt(s.substring(posPort, s.indexOf(' ', posPort)));
|
||||
tmpStartCode = Long.parseLong(s.substring(posStartCode, s.indexOf('.', posStartCode)));
|
||||
tmpSeqNum = Long.parseLong(s.substring(posSeqNum, s.indexOf('.', posSeqNum)));
|
||||
} catch (Exception ignored) {
|
||||
LOG.warn("Can't parse the hostname and the port from this string: " + s + ", continuing");
|
||||
LOG.warn("Can't parse the hostname, port and startCode from this string: " +
|
||||
s + ", continuing");
|
||||
}
|
||||
|
||||
hostname = tmpHostname;
|
||||
port = tmpPort;
|
||||
startCode = tmpStartCode;
|
||||
locationSeqNum = tmpSeqNum;
|
||||
}
|
||||
|
||||
|
@ -92,8 +103,8 @@ public class RegionMovedException extends NotServingRegionException {
|
|||
public String getMessage() {
|
||||
// TODO: deserialization above depends on this. That is bad, but also means this
|
||||
// should be modified carefully.
|
||||
return "Region moved to: " + HOST_FIELD + hostname + " " + PORT_FIELD + port + ". As of "
|
||||
+ LOCATIONSEQNUM_FIELD + locationSeqNum + ".";
|
||||
return "Region moved to: " + HOST_FIELD + hostname + " " + PORT_FIELD + port + " " +
|
||||
STARTCODE_FIELD + startCode + ". As of " + LOCATIONSEQNUM_FIELD + locationSeqNum + ".";
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -497,7 +497,7 @@ public class CatalogTracker {
|
|||
}
|
||||
AdminProtocol protocol = null;
|
||||
try {
|
||||
protocol = connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
protocol = connection.getAdmin(sn);
|
||||
} catch (RetriesExhaustedException e) {
|
||||
if (e.getCause() != null && e.getCause() instanceof ConnectException) {
|
||||
// Catch this; presume it means the cached connection has gone bad.
|
||||
|
|
|
@ -556,7 +556,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
|
||||
// Wait until all regions deleted
|
||||
ClientProtocol server =
|
||||
connection.getClient(firstMetaServer.getHostname(), firstMetaServer.getPort());
|
||||
connection.getClient(firstMetaServer.getServerName());
|
||||
for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
|
||||
try {
|
||||
|
||||
|
@ -1184,8 +1184,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
"The servername cannot be null or empty.");
|
||||
}
|
||||
ServerName sn = new ServerName(serverName);
|
||||
AdminProtocol admin = this.connection.getAdmin(
|
||||
sn.getHostname(), sn.getPort());
|
||||
AdminProtocol admin = this.connection.getAdmin(sn);
|
||||
// Close the region without updating zk state.
|
||||
CloseRegionRequest request =
|
||||
RequestConverter.buildCloseRegionRequest(encodedRegionName, false);
|
||||
|
@ -1211,7 +1210,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
public void closeRegion(final ServerName sn, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
this.connection.getAdmin(sn);
|
||||
// Close the region without updating zk state.
|
||||
ProtobufUtil.closeRegion(admin, hri.getRegionName(), false);
|
||||
}
|
||||
|
@ -1222,7 +1221,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
public List<HRegionInfo> getOnlineRegions(
|
||||
final ServerName sn) throws IOException {
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
this.connection.getAdmin(sn);
|
||||
return ProtobufUtil.getOnlineRegions(admin);
|
||||
}
|
||||
|
||||
|
@ -1285,7 +1284,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
private void flush(final ServerName sn, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
this.connection.getAdmin(sn);
|
||||
FlushRegionRequest request =
|
||||
RequestConverter.buildFlushRegionRequest(hri.getRegionName());
|
||||
try {
|
||||
|
@ -1455,7 +1454,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
final boolean major, final byte [] family)
|
||||
throws IOException {
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
this.connection.getAdmin(sn);
|
||||
CompactRegionRequest request =
|
||||
RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
|
||||
try {
|
||||
|
@ -1733,7 +1732,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
private void split(final ServerName sn, final HRegionInfo hri,
|
||||
byte[] splitPoint) throws IOException {
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
this.connection.getAdmin(sn);
|
||||
ProtobufUtil.split(admin, hri, splitPoint);
|
||||
}
|
||||
|
||||
|
@ -1857,7 +1856,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
String hostname = Addressing.parseHostname(hostnamePort);
|
||||
int port = Addressing.parsePort(hostnamePort);
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(hostname, port);
|
||||
this.connection.getAdmin(new ServerName(hostname, port, 0));
|
||||
StopServerRequest request = RequestConverter.buildStopServerRequest(
|
||||
"Called by admin client " + this.connection.toString());
|
||||
try {
|
||||
|
@ -1867,6 +1866,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return cluster status
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
|
@ -1998,9 +1998,8 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
public synchronized byte[][] rollHLogWriter(String serverName)
|
||||
throws IOException, FailedLogCloseException {
|
||||
ServerName sn = new ServerName(serverName);
|
||||
AdminProtocol admin = this.connection.getAdmin(
|
||||
sn.getHostname(), sn.getPort());
|
||||
RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();;
|
||||
AdminProtocol admin = this.connection.getAdmin(sn);
|
||||
RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
|
||||
try {
|
||||
RollWALWriterResponse response = admin.rollWALWriter(null, request);
|
||||
int regionCount = response.getRegionToFlushCount();
|
||||
|
@ -2060,7 +2059,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
} else {
|
||||
ServerName sn = regionServerPair.getSecond();
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
this.connection.getAdmin(sn);
|
||||
GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
|
||||
regionServerPair.getFirst().getRegionName(), true);
|
||||
GetRegionInfoResponse response = admin.getRegionInfo(null, request);
|
||||
|
@ -2076,7 +2075,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
try {
|
||||
ServerName sn = pair.getSecond();
|
||||
AdminProtocol admin =
|
||||
this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
this.connection.getAdmin(sn);
|
||||
GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
|
||||
pair.getFirst().getRegionName(), true);
|
||||
GetRegionInfoResponse response = admin.getRegionInfo(null, request);
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.MasterAdminProtocol;
|
||||
import org.apache.hadoop.hbase.MasterMonitorProtocol;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
|
@ -208,11 +209,23 @@ public interface HConnection extends Abortable, Closeable {
|
|||
* @param port RegionServer port
|
||||
* @return proxy for HRegionServer
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*
|
||||
* @deprecated - use @link {#getAdmin(final ServerName serverName)} which takes into account
|
||||
* the startCode
|
||||
*/
|
||||
@Deprecated
|
||||
public AdminProtocol getAdmin(final String hostname, final int port)
|
||||
throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Establishes a connection to the region server at the specified address.
|
||||
* @param serverName
|
||||
* @return proxy for HRegionServer
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
public AdminProtocol getAdmin(final ServerName serverName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Establishes a connection to the region server at the specified address, and return
|
||||
* a region client protocol.
|
||||
|
@ -221,11 +234,25 @@ public interface HConnection extends Abortable, Closeable {
|
|||
* @param port RegionServer port
|
||||
* @return ClientProtocol proxy for RegionServer
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*
|
||||
* @deprecated - use @link {#getClient(final ServerName serverName)} which takes into account
|
||||
* the startCode
|
||||
*/
|
||||
@Deprecated
|
||||
public ClientProtocol getClient(final String hostname, final int port)
|
||||
throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Establishes a connection to the region server at the specified address, and return
|
||||
* a region client protocol.
|
||||
*
|
||||
* @param serverName
|
||||
* @return ClientProtocol proxy for RegionServer
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*
|
||||
*/
|
||||
public ClientProtocol getClient(final ServerName serverName) throws IOException;
|
||||
|
||||
/**
|
||||
* Establishes a connection to the region server at the specified address.
|
||||
* @param hostname RegionServer hostname
|
||||
|
@ -233,9 +260,21 @@ public interface HConnection extends Abortable, Closeable {
|
|||
* @param getMaster - do we check if master is alive
|
||||
* @return proxy for HRegionServer
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @deprecated use @link {#getAdmin(final ServerName serverName, boolean getMaster)}
|
||||
* which takes into account the startCode.
|
||||
*/
|
||||
public AdminProtocol getAdmin(final String hostname,
|
||||
final int port, boolean getMaster)
|
||||
@Deprecated
|
||||
public AdminProtocol getAdmin(final String hostname, final int port, boolean getMaster)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Establishes a connection to the region server at the specified address.
|
||||
* @param serverName
|
||||
* @param getMaster - do we check if master is alive
|
||||
* @return proxy for HRegionServer
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
public AdminProtocol getAdmin(final ServerName serverName, boolean getMaster)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -85,7 +85,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDe
|
|||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
|
@ -959,8 +958,7 @@ public class HConnectionManager {
|
|||
LOG.debug("Looked up root region location, connection=" + this +
|
||||
"; serverName=" + ((servername == null) ? "null" : servername));
|
||||
if (servername == null) return null;
|
||||
return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, servername.getHostname(),
|
||||
servername.getPort(), 0);
|
||||
return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, servername, 0);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
|
@ -1008,8 +1006,8 @@ public class HConnectionManager {
|
|||
return true; // don't cache it
|
||||
}
|
||||
// instantiate the location
|
||||
HRegionLocation loc = new HRegionLocation(regionInfo, serverName.getHostname(),
|
||||
serverName.getPort(), HRegionInfo.getSeqNumDuringOpen(result));
|
||||
HRegionLocation loc = new HRegionLocation(regionInfo, serverName,
|
||||
HRegionInfo.getSeqNumDuringOpen(result));
|
||||
// cache this meta entry
|
||||
cacheLocation(tableName, null, loc);
|
||||
return true;
|
||||
|
@ -1063,7 +1061,7 @@ public class HConnectionManager {
|
|||
// If null still, go around again.
|
||||
if (metaLocation == null) continue;
|
||||
ClientProtocol server =
|
||||
getClient(metaLocation.getHostname(), metaLocation.getPort());
|
||||
getClient(metaLocation.getServerName());
|
||||
|
||||
Result regionInfoRow = null;
|
||||
// This block guards against two threads trying to load the meta
|
||||
|
@ -1133,8 +1131,8 @@ public class HConnectionManager {
|
|||
}
|
||||
|
||||
// Instantiate the location
|
||||
location = new HRegionLocation(regionInfo, serverName.getHostname(),
|
||||
serverName.getPort(), HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
|
||||
location = new HRegionLocation(regionInfo, serverName,
|
||||
HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
|
||||
cacheLocation(tableName, null, location);
|
||||
return location;
|
||||
} catch (TableNotFoundException e) {
|
||||
|
@ -1218,7 +1216,7 @@ public class HConnectionManager {
|
|||
return possibleRegion;
|
||||
}
|
||||
|
||||
// Passed all the way through, so we got nothin - complete cache miss
|
||||
// Passed all the way through, so we got nothing - complete cache miss
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1368,24 +1366,46 @@ public class HConnectionManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AdminProtocol getAdmin(final String hostname,
|
||||
final int port) throws IOException {
|
||||
return getAdmin(hostname, port, false);
|
||||
@Deprecated
|
||||
public AdminProtocol getAdmin(final String hostname, final int port) throws IOException {
|
||||
return getAdmin(new ServerName(hostname, port, 0L));
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminProtocol getAdmin(final ServerName serverName)
|
||||
throws IOException {
|
||||
return getAdmin(serverName, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public ClientProtocol getClient(final String hostname, final int port)
|
||||
throws IOException {
|
||||
return (ClientProtocol)getProtocol(hostname, port, clientClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientProtocol getClient(final ServerName serverName)
|
||||
throws IOException {
|
||||
return (ClientProtocol)
|
||||
getProtocol(serverName.getHostname(), serverName.getPort(), clientClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public AdminProtocol getAdmin(final String hostname, final int port,
|
||||
final boolean master)
|
||||
throws IOException {
|
||||
return (AdminProtocol)getProtocol(hostname, port, adminClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminProtocol getAdmin(final ServerName serverName, final boolean master)
|
||||
throws IOException {
|
||||
return (AdminProtocol)getProtocol(
|
||||
serverName.getHostname(), serverName.getPort(), adminClass);
|
||||
}
|
||||
|
||||
/**
|
||||
* Either the passed <code>isa</code> is null or <code>hostname</code>
|
||||
* can be but not both.
|
||||
|
@ -1757,8 +1777,7 @@ public class HConnectionManager {
|
|||
|
||||
@Override
|
||||
public void connect(boolean reload) throws IOException {
|
||||
server = connection.getClient(
|
||||
loc.getHostname(), loc.getPort());
|
||||
server = connection.getClient(loc.getServerName());
|
||||
}
|
||||
};
|
||||
return callable.withoutRetries();
|
||||
|
@ -1767,8 +1786,8 @@ public class HConnectionManager {
|
|||
}
|
||||
|
||||
void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
|
||||
String hostname, int port, long seqNum) {
|
||||
HRegionLocation newHrl = new HRegionLocation(hri, hostname, port, seqNum);
|
||||
ServerName serverName, long seqNum) {
|
||||
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
|
||||
synchronized (this.cachedRegionLocations) {
|
||||
cacheLocation(hri.getTableName(), source, newHrl);
|
||||
}
|
||||
|
@ -1781,7 +1800,7 @@ public class HConnectionManager {
|
|||
*/
|
||||
void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
|
||||
boolean isStaleDelete = false;
|
||||
HRegionLocation oldLocation = null;
|
||||
HRegionLocation oldLocation;
|
||||
synchronized (this.cachedRegionLocations) {
|
||||
Map<byte[], HRegionLocation> tableLocations =
|
||||
getTableLocations(hri.getTableName());
|
||||
|
@ -1829,7 +1848,7 @@ public class HConnectionManager {
|
|||
LOG.info("Region " + regionInfo.getRegionNameAsString() + " moved to " +
|
||||
rme.getHostname() + ":" + rme.getPort() + " according to " + source.getHostnamePort());
|
||||
updateCachedLocation(
|
||||
regionInfo, source, rme.getHostname(), rme.getPort(), rme.getLocationSeqNum());
|
||||
regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
|
||||
} else {
|
||||
deleteCachedLocation(regionInfo, source);
|
||||
}
|
||||
|
@ -2124,8 +2143,7 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
public String getDescriptionAndClear()
|
||||
{
|
||||
public String getDescriptionAndClear(){
|
||||
if (exceptions.isEmpty()) {
|
||||
return "";
|
||||
}
|
||||
|
@ -2134,7 +2152,7 @@ public class HConnectionManager {
|
|||
actions.clear();
|
||||
addresses.clear();
|
||||
return result;
|
||||
};
|
||||
}
|
||||
|
||||
private RetriesExhaustedWithDetailsException makeException() {
|
||||
return new RetriesExhaustedWithDetailsException(exceptions, actions, addresses);
|
||||
|
|
|
@ -87,8 +87,7 @@ public abstract class ServerCallable<T> implements Callable<T> {
|
|||
*/
|
||||
public void connect(final boolean reload) throws IOException {
|
||||
this.location = connection.getRegionLocation(tableName, row, reload);
|
||||
this.server = connection.getClient(location.getHostname(),
|
||||
location.getPort());
|
||||
this.server = connection.getClient(location.getServerName());
|
||||
}
|
||||
|
||||
/** @return the server name
|
||||
|
|
|
@ -76,7 +76,7 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
|||
private EventHandlerListener listener;
|
||||
|
||||
// Time to wait for events to happen, should be kept short
|
||||
protected final int waitingTimeForEvents;
|
||||
protected int waitingTimeForEvents;
|
||||
|
||||
private final Span parent;
|
||||
|
||||
|
@ -146,7 +146,10 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
|||
// Master controlled events to be executed on the master
|
||||
M_SERVER_SHUTDOWN (70, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing shutdown of a RS
|
||||
M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.).
|
||||
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS); // Master is processing recovery of regions found in ZK RIT
|
||||
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing recovery of regions found in ZK RIT
|
||||
|
||||
// RS controlled events to be executed on the RS
|
||||
RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK);
|
||||
|
||||
private final int code;
|
||||
private final ExecutorService.ExecutorType executor;
|
||||
|
@ -193,9 +196,11 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
|||
this.server = server;
|
||||
this.eventType = eventType;
|
||||
seqid = seqids.incrementAndGet();
|
||||
if (server != null) {
|
||||
this.waitingTimeForEvents = server.getConfiguration().
|
||||
getInt("hbase.master.event.waiting.time", 1000);
|
||||
}
|
||||
}
|
||||
|
||||
public void run() {
|
||||
Span chunk = Trace.startSpan(Thread.currentThread().getName(), parent,
|
||||
|
|
|
@ -92,7 +92,8 @@ public class ExecutorService {
|
|||
RS_OPEN_META (22),
|
||||
RS_CLOSE_REGION (23),
|
||||
RS_CLOSE_ROOT (24),
|
||||
RS_CLOSE_META (25);
|
||||
RS_CLOSE_META (25),
|
||||
RS_PARALLEL_SEEK (26);
|
||||
|
||||
ExecutorType(int value) {}
|
||||
|
||||
|
|
|
@ -2806,6 +2806,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
+ " since it is not opening on the dead server any more: " + sn);
|
||||
it.remove();
|
||||
} else {
|
||||
try{
|
||||
// Delete the ZNode if exists
|
||||
ZKAssign.deleteNodeFailSilent(watcher, hri);
|
||||
} catch (KeeperException ke) {
|
||||
server.abort("Unexpected ZK exception deleting node " + hri, ke);
|
||||
}
|
||||
// Mark the region closed and assign it again by SSH
|
||||
regionStates.updateRegionState(hri, RegionState.State.CLOSED);
|
||||
}
|
||||
|
|
|
@ -683,7 +683,7 @@ public class ServerManager {
|
|||
AdminProtocol admin = this.serverConnections.get(sn);
|
||||
if (admin == null) {
|
||||
LOG.debug("New connection to " + sn.toString());
|
||||
admin = this.connection.getAdmin(sn.getHostname(), sn.getPort());
|
||||
admin = this.connection.getAdmin(sn);
|
||||
this.serverConnections.put(sn, admin);
|
||||
}
|
||||
return admin;
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
|
||||
/**
|
||||
* Default StoreEngine creates the default compactor, policy, and store file manager.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DefaultStoreEngine extends StoreEngine {
|
||||
public DefaultStoreEngine(Configuration conf, Store store, KVComparator comparator) {
|
||||
super(conf, store, comparator);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createComponents(PP<StoreFileManager> storeFileManager,
|
||||
PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor) {
|
||||
storeFileManager.set(new DefaultStoreFileManager(this.comparator));
|
||||
compactionPolicy.set(new DefaultCompactionPolicy(this.conf, this.store));
|
||||
compactor.set(new DefaultCompactor(this.conf, this.store));
|
||||
}
|
||||
}
|
|
@ -1535,6 +1535,10 @@ public class HRegionServer implements ClientProtocol,
|
|||
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
|
||||
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
||||
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
|
||||
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
|
||||
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
|
||||
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
|
||||
}
|
||||
|
||||
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
|
||||
uncaughtExceptionHandler);
|
||||
|
@ -2552,8 +2556,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
if (region == null) {
|
||||
MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
|
||||
if (moveInfo != null) {
|
||||
throw new RegionMovedException(moveInfo.getServerName().getHostname(),
|
||||
moveInfo.getServerName().getPort(), moveInfo.getSeqNum());
|
||||
throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
|
||||
} else {
|
||||
throw new NotServingRegionException("Region is not online: " + encodedRegionName);
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ public class HStore implements Store {
|
|||
// Comparing KeyValues
|
||||
private final KeyValue.KVComparator comparator;
|
||||
|
||||
private Compactor compactor;
|
||||
final Compactor compactor;
|
||||
|
||||
private OffPeakCompactions offPeakCompactions;
|
||||
|
||||
|
@ -222,7 +222,8 @@ public class HStore implements Store {
|
|||
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
|
||||
}
|
||||
|
||||
this.storeFileManager = new DefaultStoreFileManager(this.comparator);
|
||||
StoreEngine engine = StoreEngine.create(this, this.conf, this.comparator);
|
||||
this.storeFileManager = engine.getStoreFileManager();
|
||||
this.storeFileManager.loadFiles(loadStoreFiles());
|
||||
|
||||
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
|
||||
|
@ -241,9 +242,9 @@ public class HStore implements Store {
|
|||
+ HStore.flush_retries_number);
|
||||
}
|
||||
}
|
||||
this.compactionPolicy = CompactionPolicy.create(this, conf);
|
||||
this.compactionPolicy = engine.getCompactionPolicy();
|
||||
// Get the compaction tool instance for this policy
|
||||
this.compactor = compactionPolicy.getCompactor();
|
||||
this.compactor = engine.getCompactor();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1672,6 +1673,7 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
@Override
|
||||
// TODO: why is there this and also getNumberOfStorefiles?! Remove one.
|
||||
public int getStorefilesCount() {
|
||||
return this.storeFileManager.getStorefileCount();
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -92,4 +93,9 @@ public interface RegionServerServices extends OnlineRegions {
|
|||
* @return The RegionServer's "Leases" service
|
||||
*/
|
||||
public Leases getLeases();
|
||||
|
||||
/**
|
||||
* @return hbase executor service
|
||||
*/
|
||||
public ExecutorService getExecutorService();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* StoreEngine is a factory that can create the objects necessary for HStore to operate.
|
||||
* Since not all compaction policies, compactors and store file managers are compatible,
|
||||
* they are tied together and replaced together via StoreEngine-s.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class StoreEngine {
|
||||
protected final Store store;
|
||||
protected final Configuration conf;
|
||||
protected final KVComparator comparator;
|
||||
|
||||
private final PP<CompactionPolicy> compactionPolicy = new PP<CompactionPolicy>();
|
||||
private final PP<Compactor> compactor = new PP<Compactor>();
|
||||
private final PP<StoreFileManager> storeFileManager = new PP<StoreFileManager>();
|
||||
private boolean isInitialized = false;
|
||||
|
||||
/**
|
||||
* The name of the configuration parameter that specifies the class of
|
||||
* a store engine that is used to manage and compact HBase store files.
|
||||
*/
|
||||
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
|
||||
|
||||
private static final Class<? extends StoreEngine>
|
||||
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
|
||||
|
||||
/**
|
||||
* @return Compaction policy to use.
|
||||
*/
|
||||
public CompactionPolicy getCompactionPolicy() {
|
||||
createComponentsOnce();
|
||||
return this.compactionPolicy.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Compactor to use.
|
||||
*/
|
||||
public Compactor getCompactor() {
|
||||
createComponentsOnce();
|
||||
return this.compactor.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Store file manager to use.
|
||||
*/
|
||||
public StoreFileManager getStoreFileManager() {
|
||||
createComponentsOnce();
|
||||
return this.storeFileManager.get();
|
||||
}
|
||||
|
||||
protected StoreEngine(Configuration conf, Store store, KVComparator comparator) {
|
||||
this.store = store;
|
||||
this.conf = conf;
|
||||
this.comparator = comparator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the StoreEngine's components.
|
||||
* @param storeFileManager out parameter for StoreFileManager.
|
||||
* @param compactionPolicy out parameter for CompactionPolicy.
|
||||
* @param compactor out parameter for Compactor.
|
||||
*/
|
||||
protected abstract void createComponents(PP<StoreFileManager> storeFileManager,
|
||||
PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor);
|
||||
|
||||
private void createComponentsOnce() {
|
||||
if (isInitialized) return;
|
||||
createComponents(storeFileManager, compactionPolicy, compactor);
|
||||
isInitialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the StoreEngine configured for the given Store.
|
||||
* @param store The store. An unfortunate dependency needed due to it
|
||||
* being passed to coprocessors via the compactor.
|
||||
* @param conf Store configuration.
|
||||
* @param kvComparator KVComparator for storeFileManager.
|
||||
* @return StoreEngine to use.
|
||||
*/
|
||||
public static StoreEngine create(Store store, Configuration conf, KVComparator kvComparator)
|
||||
throws IOException {
|
||||
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
|
||||
try {
|
||||
return ReflectionUtils.instantiateWithCustomCtor(className,
|
||||
new Class[] { Configuration.class, Store.class, KVComparator.class },
|
||||
new Object[] { conf, store, kvComparator });
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Unable to load configured store engine '" + className + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* To allow StoreEngine-s to have custom dependencies between 3 components, we want to create
|
||||
* them in one place. To return multiple, simulate C++ pointer to pointers/C# out params.
|
||||
*/
|
||||
protected static class PP<T> {
|
||||
private T t = null;
|
||||
public void set(T t) {
|
||||
this.t = t;
|
||||
}
|
||||
public T get() {
|
||||
return this.t;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,9 +20,11 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -31,8 +33,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
|
@ -59,6 +63,11 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
protected final boolean isGet;
|
||||
protected final boolean explicitColumnQuery;
|
||||
protected final boolean useRowColBloom;
|
||||
/**
|
||||
* A flag that enables StoreFileScanner parallel-seeking
|
||||
*/
|
||||
protected boolean isParallelSeekEnabled = false;
|
||||
protected ExecutorService executor;
|
||||
protected final Scan scan;
|
||||
protected final NavigableSet<byte[]> columns;
|
||||
protected final long oldestUnexpiredTS;
|
||||
|
@ -66,6 +75,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
|
||||
/** We don't ever expect to change this, the constant is just for clarity. */
|
||||
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
|
||||
public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
|
||||
"hbase.storescanner.parallel.seek.enable";
|
||||
|
||||
/** Used during unit testing to ensure that lazy seek does save seek ops */
|
||||
protected static boolean lazySeekEnabledGlobally =
|
||||
|
@ -92,6 +103,17 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
// for multi-row (non-"get") scans because this is not done in
|
||||
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
|
||||
useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
|
||||
// The parallel-seeking is on :
|
||||
// 1) the config value is *true*
|
||||
// 2) store has more than one store file
|
||||
if (store != null && ((HStore)store).getHRegion() != null
|
||||
&& store.getStorefilesCount() > 1) {
|
||||
RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
|
||||
if (rsService == null || !rsService.getConfiguration().getBoolean(
|
||||
STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
|
||||
isParallelSeekEnabled = true;
|
||||
executor = rsService.getExecutorService();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -127,9 +149,13 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
scanner.requestSeek(matcher.getStartKey(), false, true);
|
||||
}
|
||||
} else {
|
||||
if (!isParallelSeekEnabled) {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.seek(matcher.getStartKey());
|
||||
}
|
||||
} else {
|
||||
parallelSeek(scanners, matcher.getStartKey());
|
||||
}
|
||||
}
|
||||
|
||||
// set storeLimit
|
||||
|
@ -166,9 +192,13 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
scanners = selectScannersFrom(scanners);
|
||||
|
||||
// Seek all scanners to the initial key
|
||||
if (!isParallelSeekEnabled) {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.seek(matcher.getStartKey());
|
||||
}
|
||||
} else {
|
||||
parallelSeek(scanners, matcher.getStartKey());
|
||||
}
|
||||
|
||||
// Combine all seeked scanners with a heap
|
||||
heap = new KeyValueHeap(scanners, store.getComparator());
|
||||
|
@ -193,9 +223,13 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
|
||||
|
||||
// Seek all scanners to the initial key
|
||||
if (!isParallelSeekEnabled) {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.seek(matcher.getStartKey());
|
||||
}
|
||||
} else {
|
||||
parallelSeek(scanners, matcher.getStartKey());
|
||||
}
|
||||
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
|
||||
}
|
||||
|
||||
|
@ -513,9 +547,13 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
* could have done it now by storing the scan object from the constructor */
|
||||
List<KeyValueScanner> scanners = getScannersNoCompaction();
|
||||
|
||||
if (!isParallelSeekEnabled) {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.seek(lastTopKey);
|
||||
}
|
||||
} else {
|
||||
parallelSeek(scanners, lastTopKey);
|
||||
}
|
||||
|
||||
// Combine all seeked scanners with a heap
|
||||
heap = new KeyValueHeap(scanners, store.getComparator());
|
||||
|
@ -546,9 +584,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
checkReseek();
|
||||
if (explicitColumnQuery && lazySeekEnabledGlobally) {
|
||||
return heap.requestSeek(kv, true, useRowColBloom);
|
||||
} else {
|
||||
return heap.reseek(kv);
|
||||
}
|
||||
return heap.reseek(kv);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -556,6 +593,44 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Seek storefiles in parallel to optimize IO latency as much as possible
|
||||
* @param scanners the list {@link KeyValueScanner}s to be read from
|
||||
* @param kv the KeyValue on which the operation is being requested
|
||||
* @throws IOException
|
||||
*/
|
||||
private void parallelSeek(final List<? extends KeyValueScanner>
|
||||
scanners, final KeyValue kv) throws IOException {
|
||||
if (scanners.isEmpty()) return;
|
||||
int storeFileScannerCount = scanners.size();
|
||||
CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
|
||||
List<ParallelSeekHandler> handlers =
|
||||
new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
if (scanner instanceof StoreFileScanner) {
|
||||
ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
|
||||
MultiVersionConsistencyControl.getThreadReadPoint(), latch);
|
||||
executor.submit(seekHandler);
|
||||
handlers.add(seekHandler);
|
||||
} else {
|
||||
scanner.seek(kv);
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException(ie.getMessage());
|
||||
}
|
||||
|
||||
for (ParallelSeekHandler handler : handlers) {
|
||||
if (handler.getErr() != null) {
|
||||
throw new IOException(handler.getErr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in testing.
|
||||
* @return all scanners in no particular order
|
||||
|
|
|
@ -25,33 +25,22 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* A compaction policy determines how to select files for compaction,
|
||||
* how to compact them, and how to generate the compacted files.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class CompactionPolicy extends Configured {
|
||||
public abstract class CompactionPolicy {
|
||||
protected CompactionConfiguration comConf;
|
||||
protected StoreConfigInformation storeConfigInfo;
|
||||
|
||||
/**
|
||||
* The name of the configuration parameter that specifies
|
||||
* the class of a compaction policy that is used to compact
|
||||
* HBase store files.
|
||||
*/
|
||||
public static final String COMPACTION_POLICY_KEY =
|
||||
"hbase.hstore.compaction.policy";
|
||||
|
||||
private static final Class<? extends CompactionPolicy>
|
||||
DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class;
|
||||
|
||||
CompactionConfiguration comConf;
|
||||
Compactor compactor;
|
||||
HStore store;
|
||||
public CompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||
this.storeConfigInfo = storeConfigInfo;
|
||||
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called before coprocessor preCompactSelection and should filter the candidates
|
||||
|
@ -107,68 +96,7 @@ public abstract class CompactionPolicy extends Configured {
|
|||
* Inform the policy that some configuration has been change,
|
||||
* so cached value should be updated it any.
|
||||
*/
|
||||
public void updateConfiguration() {
|
||||
if (getConf() != null && store != null) {
|
||||
comConf = new CompactionConfiguration(getConf(), store);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the compactor for this policy
|
||||
* @return the compactor for this policy
|
||||
*/
|
||||
public Compactor getCompactor() {
|
||||
return compactor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the new configuration
|
||||
*/
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
updateConfiguration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Upon construction, this method will be called with the HStore
|
||||
* to be governed. It will be called once and only once.
|
||||
*/
|
||||
protected void configureForStore(HStore store) {
|
||||
this.store = store;
|
||||
updateConfiguration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the CompactionPolicy configured for the given HStore.
|
||||
* @param store
|
||||
* @param conf
|
||||
* @return a CompactionPolicy
|
||||
* @throws IOException
|
||||
*/
|
||||
public static CompactionPolicy create(HStore store,
|
||||
Configuration conf) throws IOException {
|
||||
Class<? extends CompactionPolicy> clazz =
|
||||
getCompactionPolicyClass(store.getFamily(), conf);
|
||||
CompactionPolicy policy = ReflectionUtils.newInstance(clazz, conf);
|
||||
policy.configureForStore(store);
|
||||
return policy;
|
||||
}
|
||||
|
||||
static Class<? extends CompactionPolicy> getCompactionPolicyClass(
|
||||
HColumnDescriptor family, Configuration conf) throws IOException {
|
||||
String className = conf.get(COMPACTION_POLICY_KEY,
|
||||
DEFAULT_COMPACTION_POLICY_CLASS.getName());
|
||||
|
||||
try {
|
||||
Class<? extends CompactionPolicy> clazz =
|
||||
Class.forName(className).asSubclass(CompactionPolicy.class);
|
||||
return clazz;
|
||||
} catch (Exception e) {
|
||||
throw new IOException(
|
||||
"Unable to load configured region compaction policy '"
|
||||
+ className + "' for column '" + family.getNameAsString()
|
||||
+ "'", e);
|
||||
}
|
||||
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,11 +32,11 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
|
|||
@InterfaceAudience.Private
|
||||
public abstract class Compactor {
|
||||
|
||||
CompactionProgress progress;
|
||||
CompactionPolicy policy;
|
||||
protected CompactionProgress progress;
|
||||
protected Configuration conf;
|
||||
|
||||
Compactor(final CompactionPolicy policy) {
|
||||
this.policy = policy;
|
||||
Compactor(final Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -51,10 +51,6 @@ public abstract class Compactor {
|
|||
public abstract List<Path> compact(final Collection<StoreFile> filesToCompact,
|
||||
final boolean majorCompaction) throws IOException;
|
||||
|
||||
public Configuration getConf() {
|
||||
return policy.getConf();
|
||||
}
|
||||
|
||||
public CompactionProgress getProgress() {
|
||||
return this.progress;
|
||||
}
|
||||
|
|
|
@ -30,7 +30,10 @@ import java.util.Random;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -49,8 +52,8 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
|
||||
|
||||
public DefaultCompactionPolicy() {
|
||||
compactor = new DefaultCompactor(this);
|
||||
public DefaultCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||
super(conf, storeConfigInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -78,12 +81,13 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
* @return subset copy of candidate list that meets compaction criteria
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
@Override
|
||||
public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
|
||||
final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
|
||||
throws IOException {
|
||||
// Preliminary compaction subject to filters
|
||||
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
|
||||
long cfTtl = this.store.getStoreFileTtl();
|
||||
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
|
||||
if (!forceMajor) {
|
||||
// If there are expired files, only select them so that compaction deletes them
|
||||
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
|
||||
|
@ -326,7 +330,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
|
|||
long now = System.currentTimeMillis();
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
|
||||
// Major compaction time has elapsed.
|
||||
long cfTtl = this.store.getStoreFileTtl();
|
||||
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
|
||||
if (filesToCompact.size() == 1) {
|
||||
// Single file
|
||||
StoreFile sf = filesToCompact.iterator().next();
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -44,14 +45,16 @@ import org.apache.hadoop.util.StringUtils;
|
|||
|
||||
/**
|
||||
* Compact passed set of files.
|
||||
* Create an instance and then call {@ink #compact(Collection, boolean, long)}.
|
||||
* Create an instance and then call {@link #compact(Collection, boolean)}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class DefaultCompactor extends Compactor {
|
||||
public class DefaultCompactor extends Compactor {
|
||||
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
|
||||
private final Store store;
|
||||
|
||||
DefaultCompactor(final CompactionPolicy policy) {
|
||||
super(policy);
|
||||
public DefaultCompactor(final Configuration conf, final Store store) {
|
||||
super(conf);
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -72,7 +75,6 @@ class DefaultCompactor extends Compactor {
|
|||
// Calculate maximum key count after compaction (for blooms)
|
||||
// Also calculate earliest put timestamp if major compaction
|
||||
int maxKeyCount = 0;
|
||||
Store store = policy.store;
|
||||
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
|
||||
for (StoreFile file: filesToCompact) {
|
||||
StoreFile.Reader r = file.getReader();
|
||||
|
@ -116,7 +118,7 @@ class DefaultCompactor extends Compactor {
|
|||
.getScannersForStoreFiles(filesToCompact, false, false, true);
|
||||
|
||||
// Get some configs
|
||||
int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10);
|
||||
int compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
|
||||
Compression.Algorithm compression = store.getFamily().getCompression();
|
||||
// Avoid overriding compression setting for major compactions if the user
|
||||
// has not specified it separately
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
|
||||
|
||||
/**
|
||||
* Handler to seek storefiles in parallel.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ParallelSeekHandler extends EventHandler {
|
||||
private static final Log LOG = LogFactory.getLog(ParallelSeekHandler.class);
|
||||
private KeyValueScanner scanner;
|
||||
private KeyValue keyValue;
|
||||
private long readPoint;
|
||||
private CountDownLatch latch;
|
||||
private Throwable err = null;
|
||||
|
||||
public ParallelSeekHandler(KeyValueScanner scanner,KeyValue keyValue,
|
||||
long readPoint, CountDownLatch latch) {
|
||||
super(null, EventType.RS_PARALLEL_SEEK);
|
||||
this.scanner = scanner;
|
||||
this.keyValue = keyValue;
|
||||
this.readPoint = readPoint;
|
||||
this.latch = latch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
try {
|
||||
MultiVersionConsistencyControl.setThreadReadPoint(readPoint);
|
||||
scanner.seek(keyValue);
|
||||
} catch (IOException e) {
|
||||
LOG.error("", e);
|
||||
setErr(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public Throwable getErr() {
|
||||
return err;
|
||||
}
|
||||
|
||||
public void setErr(Throwable err) {
|
||||
this.err = err;
|
||||
}
|
||||
}
|
|
@ -764,7 +764,7 @@ public class ReplicationSource extends Thread
|
|||
}
|
||||
ServerName address =
|
||||
currentPeers.get(random.nextInt(this.currentPeers.size()));
|
||||
return this.conn.getAdmin(address.getHostname(), address.getPort());
|
||||
return this.conn.getAdmin(address);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -3031,7 +3031,7 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
errors.progress();
|
||||
try {
|
||||
AdminProtocol server =
|
||||
connection.getAdmin(rsinfo.getHostname(), rsinfo.getPort());
|
||||
connection.getAdmin(rsinfo);
|
||||
|
||||
// list all online regions from this region server
|
||||
List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(server);
|
||||
|
|
|
@ -149,7 +149,7 @@ public class HBaseFsckRepair {
|
|||
public static void closeRegionSilentlyAndWait(HBaseAdmin admin,
|
||||
ServerName server, HRegionInfo region) throws IOException, InterruptedException {
|
||||
HConnection connection = admin.getConnection();
|
||||
AdminProtocol rs = connection.getAdmin(server.getHostname(), server.getPort());
|
||||
AdminProtocol rs = connection.getAdmin(server);
|
||||
ProtobufUtil.closeRegion(rs, region.getRegionName(), false);
|
||||
long timeout = admin.getConfiguration()
|
||||
.getLong("hbase.hbck.close.timeout", 120000);
|
||||
|
|
|
@ -124,7 +124,7 @@ public class MiniZooKeeperCluster {
|
|||
// resulting in test failure (client timeout on first session).
|
||||
// set env and directly in order to handle static init/gc issues
|
||||
System.setProperty("zookeeper.preAllocSize", "100");
|
||||
FileTxnLog.setPreallocSize(100);
|
||||
FileTxnLog.setPreallocSize(100 * 1024);
|
||||
}
|
||||
|
||||
public int startup(File baseDir) throws IOException, InterruptedException {
|
||||
|
|
|
@ -441,6 +441,21 @@
|
|||
Set to 0 to disable automated major compactions.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.storescanner.parallel.seek.enable</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Enables StoreFileScanner parallel-seeking in StoreScanner,
|
||||
a feature which can reduce response latency under special conditions.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.storescanner.parallel.seek.threads</name>
|
||||
<value>10</value>
|
||||
<description>
|
||||
The default thread pool size if parallel-seeking feature enabled.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mapreduce.hfileoutputformat.blocksize</name>
|
||||
<value>65536</value>
|
||||
|
|
|
@ -1867,6 +1867,11 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
return HFileSystem.get(conf);
|
||||
}
|
||||
|
||||
public void waitTableAvailable(byte[] table)
|
||||
throws InterruptedException, IOException {
|
||||
waitTableAvailable(table, 30000);
|
||||
}
|
||||
|
||||
public void waitTableAvailable(byte[] table, long timeoutMillis)
|
||||
throws InterruptedException, IOException {
|
||||
long startWait = System.currentTimeMillis();
|
||||
|
@ -1878,6 +1883,11 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
}
|
||||
}
|
||||
|
||||
public void waitTableEnabled(byte[] table)
|
||||
throws InterruptedException, IOException {
|
||||
waitTableEnabled(table, 30000);
|
||||
}
|
||||
|
||||
public void waitTableEnabled(byte[] table, long timeoutMillis)
|
||||
throws InterruptedException, IOException {
|
||||
long startWait = System.currentTimeMillis();
|
||||
|
|
|
@ -37,24 +37,24 @@ public class TestHRegionLocation {
|
|||
public void testHashAndEqualsCode() {
|
||||
ServerName hsa1 = new ServerName("localhost", 1234, -1L);
|
||||
HRegionLocation hrl1 = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
hsa1.getHostname(), hsa1.getPort());
|
||||
hsa1, HConstants.NO_SEQNUM);
|
||||
HRegionLocation hrl2 = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
hsa1.getHostname(), hsa1.getPort());
|
||||
hsa1, HConstants.NO_SEQNUM);
|
||||
assertEquals(hrl1.hashCode(), hrl2.hashCode());
|
||||
assertTrue(hrl1.equals(hrl2));
|
||||
HRegionLocation hrl3 = new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,
|
||||
hsa1.getHostname(), hsa1.getPort());
|
||||
hsa1, HConstants.NO_SEQNUM);
|
||||
assertNotSame(hrl1, hrl3);
|
||||
// They are equal because they have same location even though they are
|
||||
// carrying different regions or timestamp.
|
||||
assertTrue(hrl1.equals(hrl3));
|
||||
ServerName hsa2 = new ServerName("localhost", 12345, -1L);
|
||||
HRegionLocation hrl4 = new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,
|
||||
hsa2.getHostname(), hsa2.getPort());
|
||||
hsa2, HConstants.NO_SEQNUM);
|
||||
// These have same HRI but different locations so should be different.
|
||||
assertFalse(hrl3.equals(hrl4));
|
||||
HRegionLocation hrl5 = new HRegionLocation(hrl4.getRegionInfo(),
|
||||
hrl4.getHostname(), hrl4.getPort(), hrl4.getSeqNum() + 1);
|
||||
hrl4.getServerName(), hrl4.getSeqNum() + 1);
|
||||
assertTrue(hrl4.equals(hrl5));
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,7 @@ public class TestHRegionLocation {
|
|||
public void testToString() {
|
||||
ServerName hsa1 = new ServerName("localhost", 1234, -1L);
|
||||
HRegionLocation hrl1 = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
hsa1.getHostname(), hsa1.getPort());
|
||||
hsa1, HConstants.NO_SEQNUM);
|
||||
System.out.println(hrl1.toString());
|
||||
}
|
||||
|
||||
|
@ -70,10 +70,10 @@ public class TestHRegionLocation {
|
|||
public void testCompareTo() {
|
||||
ServerName hsa1 = new ServerName("localhost", 1234, -1L);
|
||||
HRegionLocation hsl1 =
|
||||
new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa1.getHostname(), hsa1.getPort());
|
||||
new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa1, HConstants.NO_SEQNUM);
|
||||
ServerName hsa2 = new ServerName("localhost", 1235, -1L);
|
||||
HRegionLocation hsl2 =
|
||||
new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa2.getHostname(), hsa2.getPort());
|
||||
new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa2, HConstants.NO_SEQNUM);
|
||||
assertTrue(hsl1.compareTo(hsl1) == 0);
|
||||
assertTrue(hsl2.compareTo(hsl2) == 0);
|
||||
int compare1 = hsl1.compareTo(hsl2);
|
||||
|
|
|
@ -339,8 +339,7 @@ public class TestCatalogTracker {
|
|||
Mockito.mock(AdminProtocol.class);
|
||||
Mockito.when(implementation.getRegionInfo((RpcController)Mockito.any(),
|
||||
(GetRegionInfoRequest)Mockito.any())).thenThrow(connectException);
|
||||
Mockito.when(connection.getAdmin(Mockito.anyString(),
|
||||
Mockito.anyInt(), Mockito.anyBoolean())).
|
||||
Mockito.when(connection.getAdmin(Mockito.any(ServerName.class), Mockito.anyBoolean())).
|
||||
thenReturn(implementation);
|
||||
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
|
||||
try {
|
||||
|
@ -475,8 +474,8 @@ public class TestCatalogTracker {
|
|||
* {@link HConnection#getConfiguration()} is called, a 'location' when
|
||||
* {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
|
||||
* and that returns the passed {@link AdminProtocol} instance when
|
||||
* {@link HConnection#getAdmin(String, int)} is called, returns the passed
|
||||
* {@link ClientProtocol} instance when {@link HConnection#getClient(String, int)}
|
||||
* {@link HConnection#getAdmin(ServerName)} is called, returns the passed
|
||||
* {@link ClientProtocol} instance when {@link HConnection#getClient(ServerName)}
|
||||
* is called (Be sure call
|
||||
* {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration)}
|
||||
* when done with this mocked Connection.
|
||||
|
@ -489,8 +488,7 @@ public class TestCatalogTracker {
|
|||
Mockito.doNothing().when(connection).close();
|
||||
// Make it so we return any old location when asked.
|
||||
final HRegionLocation anyLocation =
|
||||
new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, SN.getHostname(),
|
||||
SN.getPort());
|
||||
new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, SN, HConstants.NO_SEQNUM);
|
||||
Mockito.when(connection.getRegionLocation((byte[]) Mockito.any(),
|
||||
(byte[]) Mockito.any(), Mockito.anyBoolean())).
|
||||
thenReturn(anyLocation);
|
||||
|
@ -499,12 +497,12 @@ public class TestCatalogTracker {
|
|||
thenReturn(anyLocation);
|
||||
if (admin != null) {
|
||||
// If a call to getHRegionConnection, return this implementation.
|
||||
Mockito.when(connection.getAdmin(Mockito.anyString(), Mockito.anyInt())).
|
||||
Mockito.when(connection.getAdmin(Mockito.any(ServerName.class))).
|
||||
thenReturn(admin);
|
||||
}
|
||||
if (client != null) {
|
||||
// If a call to getClient, return this implementation.
|
||||
Mockito.when(connection.getClient(Mockito.anyString(), Mockito.anyInt())).
|
||||
Mockito.when(connection.getClient(Mockito.any(ServerName.class))).
|
||||
thenReturn(client);
|
||||
}
|
||||
return connection;
|
||||
|
|
|
@ -120,7 +120,7 @@ public class TestMetaReaderEditorNoCluster {
|
|||
/**
|
||||
* Test that MetaReader will ride over server throwing
|
||||
* "Server not running" IOEs.
|
||||
* @see https://issues.apache.org/jira/browse/HBASE-3446
|
||||
* @see @link {https://issues.apache.org/jira/browse/HBASE-3446}
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
|
@ -133,7 +133,7 @@ public class TestMetaReaderEditorNoCluster {
|
|||
// This is a servername we use in a few places below.
|
||||
ServerName sn = new ServerName("example.com", 1234, System.currentTimeMillis());
|
||||
|
||||
HConnection connection = null;
|
||||
HConnection connection;
|
||||
CatalogTracker ct = null;
|
||||
try {
|
||||
// Mock an ClientProtocol. Our mock implementation will fail a few
|
||||
|
@ -178,8 +178,7 @@ public class TestMetaReaderEditorNoCluster {
|
|||
// Fix the location lookup so it 'works' though no network. First
|
||||
// make an 'any location' object.
|
||||
final HRegionLocation anyLocation =
|
||||
new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, sn.getHostname(),
|
||||
sn.getPort());
|
||||
new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, sn, HConstants.NO_SEQNUM);
|
||||
// Return the any location object when locateRegion is called in HTable
|
||||
// constructor and when its called by ServerCallable (it uses getRegionLocation).
|
||||
// The ugly format below comes of 'Important gotcha on spying real objects!' from
|
||||
|
@ -192,7 +191,7 @@ public class TestMetaReaderEditorNoCluster {
|
|||
|
||||
// Now shove our HRI implementation into the spied-upon connection.
|
||||
Mockito.doReturn(implementation).
|
||||
when(connection).getClient(Mockito.anyString(), Mockito.anyInt());
|
||||
when(connection).getClient(Mockito.any(ServerName.class));
|
||||
|
||||
// Now start up the catalogtracker with our doctored Connection.
|
||||
ct = new CatalogTracker(zkw, null, connection, ABORTABLE);
|
||||
|
|
|
@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.client;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
|
||||
import org.mockito.Mockito;
|
||||
|
@ -44,7 +43,7 @@ public class HConnectionTestingUtility {
|
|||
* configuration instance. Minimally the mock will return
|
||||
* <code>conf</conf> when {@link HConnection#getConfiguration()} is invoked.
|
||||
* Be sure to shutdown the connection when done by calling
|
||||
* {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
|
||||
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it
|
||||
* will stick around; this is probably not what you want.
|
||||
* @param conf configuration
|
||||
* @return HConnection object for <code>conf</code>
|
||||
|
@ -70,7 +69,7 @@ public class HConnectionTestingUtility {
|
|||
* more of the popular {@link HConnection} methods so they do 'normal'
|
||||
* operation (see return doc below for list). Be sure to shutdown the
|
||||
* connection when done by calling
|
||||
* {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
|
||||
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it
|
||||
* will stick around; this is probably not what you want.
|
||||
*
|
||||
* @param conf Configuration to use
|
||||
|
@ -86,10 +85,10 @@ public class HConnectionTestingUtility {
|
|||
* {@link HConnection#getConfiguration()} is called, a 'location' when
|
||||
* {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
|
||||
* and that returns the passed {@link AdminProtocol} instance when
|
||||
* {@link HConnection#getAdmin(String, int)} is called, returns the passed
|
||||
* {@link ClientProtocol} instance when {@link HConnection#getClient(String, int)}
|
||||
* {@link HConnection#getAdmin(ServerName)} is called, returns the passed
|
||||
* {@link ClientProtocol} instance when {@link HConnection#getClient(ServerName)}
|
||||
* is called (Be sure call
|
||||
* {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration, boolean)}
|
||||
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)}
|
||||
* when done with this mocked Connection.
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -100,7 +99,7 @@ public class HConnectionTestingUtility {
|
|||
HConnection c = HConnectionTestingUtility.getMockedConnection(conf);
|
||||
Mockito.doNothing().when(c).close();
|
||||
// Make it so we return a particular location when asked.
|
||||
final HRegionLocation loc = new HRegionLocation(hri, sn.getHostname(), sn.getPort());
|
||||
final HRegionLocation loc = new HRegionLocation(hri, sn, HConstants.NO_SEQNUM);
|
||||
Mockito.when(c.getRegionLocation((byte[]) Mockito.any(),
|
||||
(byte[]) Mockito.any(), Mockito.anyBoolean())).
|
||||
thenReturn(loc);
|
||||
|
@ -108,12 +107,12 @@ public class HConnectionTestingUtility {
|
|||
thenReturn(loc);
|
||||
if (admin != null) {
|
||||
// If a call to getAdmin, return this implementation.
|
||||
Mockito.when(c.getAdmin(Mockito.anyString(), Mockito.anyInt())).
|
||||
Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).
|
||||
thenReturn(admin);
|
||||
}
|
||||
if (client != null) {
|
||||
// If a call to getClient, return this client.
|
||||
Mockito.when(c.getClient(Mockito.anyString(), Mockito.anyInt())).
|
||||
Mockito.when(c.getClient(Mockito.any(ServerName.class))).
|
||||
thenReturn(client);
|
||||
}
|
||||
return c;
|
||||
|
@ -123,12 +122,13 @@ public class HConnectionTestingUtility {
|
|||
* Get a Mockito spied-upon {@link HConnection} that goes with the passed
|
||||
* <code>conf</code> configuration instance.
|
||||
* Be sure to shutdown the connection when done by calling
|
||||
* {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
|
||||
* {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it
|
||||
* will stick around; this is probably not what you want.
|
||||
* @param conf configuration
|
||||
* @return HConnection object for <code>conf</code>
|
||||
* @throws ZooKeeperConnectionException
|
||||
* @see http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)
|
||||
* @see @link
|
||||
* {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
|
||||
*/
|
||||
public static HConnection getSpiedConnection(final Configuration conf)
|
||||
throws ZooKeeperConnectionException {
|
||||
|
|
|
@ -115,8 +115,7 @@ public class TestFromClientSide3 {
|
|||
HConnection conn = HConnectionManager.getConnection(TEST_UTIL
|
||||
.getConfiguration());
|
||||
HRegionLocation loc = table.getRegionLocation(row, true);
|
||||
AdminProtocol server = conn.getAdmin(loc.getHostname(), loc
|
||||
.getPort());
|
||||
AdminProtocol server = conn.getAdmin(loc.getServerName());
|
||||
byte[] regName = loc.getRegionInfo().getRegionName();
|
||||
|
||||
for (int i = 0; i < nFlushes; i++) {
|
||||
|
@ -163,8 +162,7 @@ public class TestFromClientSide3 {
|
|||
// Verify we have multiple store files.
|
||||
HRegionLocation loc = hTable.getRegionLocation(row, true);
|
||||
byte[] regionName = loc.getRegionInfo().getRegionName();
|
||||
AdminProtocol server = connection.getAdmin(
|
||||
loc.getHostname(), loc.getPort());
|
||||
AdminProtocol server = connection.getAdmin(loc.getServerName());
|
||||
assertTrue(ProtobufUtil.getStoreFiles(
|
||||
server, regionName, FAMILY).size() > 1);
|
||||
|
||||
|
@ -177,7 +175,7 @@ public class TestFromClientSide3 {
|
|||
loc = hTable.getRegionLocation(row, true);
|
||||
if (!loc.getRegionInfo().isOffline()) {
|
||||
regionName = loc.getRegionInfo().getRegionName();
|
||||
server = connection.getAdmin(loc.getHostname(), loc.getPort());
|
||||
server = connection.getAdmin(loc.getServerName());
|
||||
if (ProtobufUtil.getStoreFiles(
|
||||
server, regionName, FAMILY).size() <= 1) {
|
||||
break;
|
||||
|
@ -211,7 +209,7 @@ public class TestFromClientSide3 {
|
|||
Thread.sleep(10 * 1000);
|
||||
loc = hTable.getRegionLocation(row, true);
|
||||
regionName = loc.getRegionInfo().getRegionName();
|
||||
server = connection.getAdmin(loc.getHostname(), loc.getPort());
|
||||
server = connection.getAdmin(loc.getServerName());
|
||||
int sfCount = ProtobufUtil.getStoreFiles(
|
||||
server, regionName, FAMILY).size();
|
||||
assertTrue(sfCount > 1);
|
||||
|
@ -236,8 +234,7 @@ public class TestFromClientSide3 {
|
|||
loc = hTable.getRegionLocation(row, true);
|
||||
regionName = loc.getRegionInfo().getRegionName();
|
||||
try {
|
||||
server = connection.getAdmin(loc.getHostname(), loc
|
||||
.getPort());
|
||||
server = connection.getAdmin(loc.getServerName());
|
||||
if (ProtobufUtil.getStoreFiles(
|
||||
server, regionName, FAMILY).size() < sfCount) {
|
||||
break;
|
||||
|
|
|
@ -151,8 +151,8 @@ public class TestHCM {
|
|||
|
||||
final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
|
||||
HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW);
|
||||
conn.updateCachedLocation(loc.getRegionInfo(), loc, "127.0.0.1", nextPort,
|
||||
HConstants.LATEST_TIMESTAMP);
|
||||
conn.updateCachedLocation(loc.getRegionInfo(), loc, new ServerName("127.0.0.1", nextPort,
|
||||
HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
|
||||
Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
|
||||
|
||||
conn.forceDeleteCachedLocation(TABLE_NAME.clone(), ROW.clone());
|
||||
|
@ -349,34 +349,34 @@ public class TestHCM {
|
|||
HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
|
||||
assertNotNull(location);
|
||||
|
||||
HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(),
|
||||
location.getHostname(), location.getPort() - 1);
|
||||
HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(), new ServerName(
|
||||
location.getHostname(), location.getPort() - 1, 0L), HConstants.NO_SEQNUM);
|
||||
|
||||
// Same server as already in cache reporting - overwrites any value despite seqNum.
|
||||
int nextPort = location.getPort() + 1;
|
||||
conn.updateCachedLocation(location.getRegionInfo(), location,
|
||||
"127.0.0.1", nextPort, location.getSeqNum() - 1);
|
||||
new ServerName("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
|
||||
location = conn.getCachedLocation(TABLE_NAME2, ROW);
|
||||
Assert.assertEquals(nextPort, location.getPort());
|
||||
|
||||
// No source specified - same.
|
||||
nextPort = location.getPort() + 1;
|
||||
conn.updateCachedLocation(location.getRegionInfo(), location,
|
||||
"127.0.0.1", nextPort, location.getSeqNum() - 1);
|
||||
new ServerName("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
|
||||
location = conn.getCachedLocation(TABLE_NAME2, ROW);
|
||||
Assert.assertEquals(nextPort, location.getPort());
|
||||
|
||||
// Higher seqNum - overwrites lower seqNum.
|
||||
nextPort = location.getPort() + 1;
|
||||
conn.updateCachedLocation(location.getRegionInfo(), anySource,
|
||||
"127.0.0.1", nextPort, location.getSeqNum() + 1);
|
||||
new ServerName("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
|
||||
location = conn.getCachedLocation(TABLE_NAME2, ROW);
|
||||
Assert.assertEquals(nextPort, location.getPort());
|
||||
|
||||
// Lower seqNum - does not overwrite higher seqNum.
|
||||
nextPort = location.getPort() + 1;
|
||||
conn.updateCachedLocation(location.getRegionInfo(), anySource,
|
||||
"127.0.0.1", nextPort, location.getSeqNum() - 1);
|
||||
new ServerName("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
|
||||
location = conn.getCachedLocation(TABLE_NAME2, ROW);
|
||||
Assert.assertEquals(nextPort - 1, location.getPort());
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ public class TestMultiParallel {
|
|||
UTIL.startMiniCluster(slaves);
|
||||
HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
|
||||
UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
|
||||
UTIL.waitTableAvailable(Bytes.toBytes(TEST_TABLE), 15 * 1000);
|
||||
UTIL.waitTableEnabled(Bytes.toBytes(TEST_TABLE));
|
||||
t.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -637,7 +637,7 @@ public class TestClassLoading {
|
|||
|
||||
private void waitForTable(byte[] name) throws InterruptedException, IOException {
|
||||
// First wait until all regions are online
|
||||
TEST_UTIL.waitTableEnabled(name, 5000);
|
||||
TEST_UTIL.waitTableEnabled(name);
|
||||
// Now wait a bit longer for the coprocessor hosts to load the CPs
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
|
|
@ -439,7 +439,7 @@ public class TestHFileOutputFormat {
|
|||
LOG.info("Waiting for table to disable");
|
||||
}
|
||||
admin.enableTable(TABLE_NAME);
|
||||
util.waitTableAvailable(TABLE_NAME, 30000);
|
||||
util.waitTableAvailable(TABLE_NAME);
|
||||
assertEquals("Data should remain after reopening of regions",
|
||||
tableDigestBefore, util.checksumRows(table));
|
||||
} finally {
|
||||
|
|
|
@ -153,7 +153,7 @@ public class TestLoadIncrementalHFiles {
|
|||
admin.createTable(htd, SPLIT_KEYS);
|
||||
|
||||
HTable table = new HTable(util.getConfiguration(), TABLE);
|
||||
util.waitTableAvailable(TABLE, 30000);
|
||||
util.waitTableEnabled(TABLE);
|
||||
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration(), useSecure);
|
||||
loader.doBulkLoad(dir, table);
|
||||
|
||||
|
@ -184,7 +184,7 @@ public class TestLoadIncrementalHFiles {
|
|||
admin.createTable(htd, SPLIT_KEYS);
|
||||
|
||||
HTable table = new HTable(util.getConfiguration(), TABLE);
|
||||
util.waitTableAvailable(TABLE, 30000);
|
||||
util.waitTableEnabled(TABLE);
|
||||
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
|
||||
util.getConfiguration());
|
||||
|
||||
|
|
|
@ -37,10 +37,12 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
|
@ -269,7 +271,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
Mockito.doNothing().when(c).close();
|
||||
// Make it so we return a particular location when asked.
|
||||
final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
"example.org", 1234);
|
||||
new ServerName("example.org", 1234, 0), HConstants.NO_SEQNUM);
|
||||
Mockito.when(c.getRegionLocation((byte[]) Mockito.any(),
|
||||
(byte[]) Mockito.any(), Mockito.anyBoolean())).
|
||||
thenReturn(loc);
|
||||
|
@ -278,7 +280,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
ClientProtocol hri = Mockito.mock(ClientProtocol.class);
|
||||
Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
|
||||
thenThrow(new ServiceException(new IOException("injecting bulk load error")));
|
||||
Mockito.when(c.getClient(Mockito.anyString(), Mockito.anyInt())).
|
||||
Mockito.when(c.getClient(Mockito.any(ServerName.class))).
|
||||
thenReturn(hri);
|
||||
return c;
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
|
|||
util.startMiniCluster();
|
||||
|
||||
// Wait for the ACL table to become available
|
||||
util.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
|
||||
util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ public class TestSecureLoadIncrementalHFilesSplitRecovery extends TestLoadIncrem
|
|||
util.startMiniCluster();
|
||||
|
||||
// Wait for the ACL table to become available
|
||||
util.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
|
||||
util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
|
||||
}
|
||||
|
||||
//Disabling this test as it does not work in secure mode
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.ClientProtocol;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
|
@ -499,4 +500,9 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
|||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService getExecutorService() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,7 +97,7 @@ public class TestRestartCluster {
|
|||
UTIL.createTable(TABLE, FAMILY);
|
||||
}
|
||||
for(byte [] TABLE : TABLES) {
|
||||
UTIL.waitTableAvailable(TABLE, 30000);
|
||||
UTIL.waitTableEnabled(TABLE);
|
||||
}
|
||||
|
||||
List<HRegionInfo> allRegions =
|
||||
|
@ -128,7 +128,7 @@ public class TestRestartCluster {
|
|||
} catch(TableExistsException tee) {
|
||||
LOG.info("Table already exists as expected");
|
||||
}
|
||||
UTIL.waitTableAvailable(TABLE, 30000);
|
||||
UTIL.waitTableAvailable(TABLE);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -589,10 +589,9 @@ public class TestCompaction extends HBaseTestCase {
|
|||
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
|
||||
|
||||
Collection<StoreFile> storeFiles = store.getStorefiles();
|
||||
Compactor tool = store.compactionPolicy.getCompactor();
|
||||
Compactor tool = store.compactor;
|
||||
|
||||
List<Path> newFiles =
|
||||
tool.compact(storeFiles, false);
|
||||
List<Path> newFiles = tool.compact(storeFiles, false);
|
||||
|
||||
// Now lets corrupt the compacted file.
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
|
|
@ -288,7 +288,7 @@ public class TestDefaultCompactSelection extends TestCase {
|
|||
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
|
||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
|
||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
|
||||
store.compactionPolicy.updateConfiguration();
|
||||
store.compactionPolicy.setConf(conf);
|
||||
try {
|
||||
// trigger an aged major compaction
|
||||
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
|
||||
|
@ -321,7 +321,7 @@ public class TestDefaultCompactSelection extends TestCase {
|
|||
*/
|
||||
// set an off-peak compaction threshold
|
||||
this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
|
||||
store.compactionPolicy.updateConfiguration();
|
||||
store.compactionPolicy.setConf(this.conf);
|
||||
// Test with and without the flag.
|
||||
compactEquals(sfCreate(999, 50, 12, 12, 1), false, true, 50, 12, 12, 1);
|
||||
compactEquals(sfCreate(999, 50, 12, 12, 1), 12, 12, 1);
|
||||
|
|
|
@ -165,8 +165,7 @@ public class TestHRegionServerBulkLoad {
|
|||
public Void call() throws Exception {
|
||||
LOG.debug("compacting " + location + " for row "
|
||||
+ Bytes.toStringBinary(row));
|
||||
AdminProtocol server = connection.getAdmin(
|
||||
location.getHostname(), location.getPort());
|
||||
AdminProtocol server = connection.getAdmin(location.getServerName());
|
||||
CompactRegionRequest request =
|
||||
RequestConverter.buildCompactRegionRequest(
|
||||
location.getRegionInfo().getRegionName(), true, null);
|
||||
|
|
|
@ -568,6 +568,7 @@ public class TestStoreScanner extends TestCase {
|
|||
assertEquals(kvs[14], results.get(5));
|
||||
assertEquals(kvs[15], results.get(6));
|
||||
assertEquals(7, results.size());
|
||||
scanner.close();
|
||||
}finally{
|
||||
EnvironmentEdgeManagerTestHelper.reset();
|
||||
}
|
||||
|
|
|
@ -26,8 +26,11 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -61,11 +64,11 @@ public class PerfTestCompactionPolicies {
|
|||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{new DefaultCompactionPolicy(), 3, 2, 1.2f},
|
||||
{new DefaultCompactionPolicy(), 4, 2, 1.2f},
|
||||
{new DefaultCompactionPolicy(), 5, 2, 1.2f},
|
||||
{new DefaultCompactionPolicy(), 4, 2, 1.3f},
|
||||
{new DefaultCompactionPolicy(), 4, 2, 1.4f},
|
||||
{DefaultCompactionPolicy.class, 3, 2, 1.2f},
|
||||
{DefaultCompactionPolicy.class, 4, 2, 1.2f},
|
||||
{DefaultCompactionPolicy.class, 5, 2, 1.2f},
|
||||
{DefaultCompactionPolicy.class, 4, 2, 1.3f},
|
||||
{DefaultCompactionPolicy.class, 4, 2, 1.4f},
|
||||
|
||||
});
|
||||
}
|
||||
|
@ -77,7 +80,8 @@ public class PerfTestCompactionPolicies {
|
|||
* @param min The min number of files to compact
|
||||
* @param ratio The ratio that files must be under to be compacted.
|
||||
*/
|
||||
public PerfTestCompactionPolicies(CompactionPolicy cp, int max, int min, float ratio) {
|
||||
public PerfTestCompactionPolicies(Class<? extends CompactionPolicy> cpClass,
|
||||
int max, int min, float ratio) {
|
||||
this.max = max;
|
||||
this.min = min;
|
||||
this.ratio = ratio;
|
||||
|
@ -86,11 +90,7 @@ public class PerfTestCompactionPolicies {
|
|||
org.apache.log4j.Logger.getLogger(CompactionConfiguration.class).
|
||||
setLevel(org.apache.log4j.Level.ERROR);
|
||||
|
||||
org.apache.log4j.Logger.getLogger(cp.getClass()).
|
||||
setLevel(org.apache.log4j.Level.ERROR);
|
||||
|
||||
this.cp = cp;
|
||||
|
||||
org.apache.log4j.Logger.getLogger(cpClass).setLevel(org.apache.log4j.Level.ERROR);
|
||||
|
||||
Configuration configuration = HBaseConfiguration.create();
|
||||
|
||||
|
@ -99,11 +99,10 @@ public class PerfTestCompactionPolicies {
|
|||
configuration.setInt("hbase.hstore.compaction.min", min);
|
||||
configuration.setFloat("hbase.hstore.compaction.ratio", ratio);
|
||||
|
||||
cp.store = createMockStore();
|
||||
|
||||
//Now set the conf.
|
||||
cp.setConf(configuration);
|
||||
|
||||
HStore store = createMockStore();
|
||||
this.cp = ReflectionUtils.instantiateWithCustomCtor(cpClass.getName(),
|
||||
new Class[] { Configuration.class, StoreConfigInformation.class },
|
||||
new Object[] { configuration, store });
|
||||
|
||||
//Used for making paths
|
||||
random = new Random(42);
|
||||
|
|
|
@ -74,7 +74,7 @@ public class TestAccessControlFilter {
|
|||
conf.set("hbase.superuser", conf.get("hbase.superuser", "") +
|
||||
String.format(",%s.hfs.0,%s.hfs.1,%s.hfs.2", baseuser, baseuser, baseuser));
|
||||
TEST_UTIL.startMiniCluster();
|
||||
TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
|
||||
TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
|
||||
|
||||
ADMIN = User.createUserForTesting(conf, "admin", new String[]{"supergroup"});
|
||||
READER = User.createUserForTesting(conf, "reader", new String[0]);
|
||||
|
|
|
@ -140,7 +140,7 @@ public class TestAccessController {
|
|||
Coprocessor.PRIORITY_HIGHEST, 1, conf);
|
||||
|
||||
// Wait for the ACL table to become available
|
||||
TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
|
||||
TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
|
||||
|
||||
// create a set of test users
|
||||
SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
|
||||
|
@ -824,7 +824,7 @@ public class TestAccessController {
|
|||
|
||||
HTable table = new HTable(conf, tableName);
|
||||
try {
|
||||
TEST_UTIL.waitTableAvailable(tableName, 30000);
|
||||
TEST_UTIL.waitTableEnabled(tableName);
|
||||
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
|
||||
loader.doBulkLoad(loadPath, table);
|
||||
} finally {
|
||||
|
|
|
@ -93,7 +93,7 @@ public class TestTablePermissions {
|
|||
UTIL.startMiniCluster();
|
||||
|
||||
// Wait for the ACL table to become available
|
||||
UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
|
||||
UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
|
||||
|
||||
ZKW = new ZooKeeperWatcher(UTIL.getConfiguration(),
|
||||
"TestTablePermissions", ABORTABLE);
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||
|
@ -172,4 +173,9 @@ public class MockRegionServerServices implements RegionServerServices {
|
|||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService getExecutorService() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,10 @@ package org.apache.hadoop.hbase.util;
|
|||
// this is deliberately not in the o.a.h.h.regionserver package
|
||||
// in order to make sure all required classes/method are available
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -59,7 +62,12 @@ import org.junit.experimental.categories.Category;
|
|||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestCoprocessorScanPolicy {
|
||||
final Log LOG = LogFactory.getLog(getClass());
|
||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
@ -67,7 +75,6 @@ public class TestCoprocessorScanPolicy {
|
|||
private static final byte[] Q = Bytes.toBytes("qual");
|
||||
private static final byte[] R = Bytes.toBytes("row");
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
@ -81,9 +88,22 @@ public class TestCoprocessorScanPolicy {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> parameters() {
|
||||
return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
|
||||
}
|
||||
|
||||
public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
|
||||
TEST_UTIL.getMiniHBaseCluster().getConf()
|
||||
.setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBaseCases() throws Exception {
|
||||
byte[] tableName = Bytes.toBytes("baseCases");
|
||||
if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
HTable t = TEST_UTIL.createTable(tableName, F, 1);
|
||||
// set the version override to 2
|
||||
Put p = new Put(R);
|
||||
|
@ -130,6 +150,9 @@ public class TestCoprocessorScanPolicy {
|
|||
@Test
|
||||
public void testTTL() throws Exception {
|
||||
byte[] tableName = Bytes.toBytes("testTTL");
|
||||
if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(F)
|
||||
.setMaxVersions(10)
|
||||
|
|
|
@ -505,7 +505,7 @@ public class TestHBaseFsck {
|
|||
HConnection connection = admin.getConnection();
|
||||
for (ServerName hsi : regionServers) {
|
||||
AdminProtocol server =
|
||||
connection.getAdmin(hsi.getHostname(), hsi.getPort());
|
||||
connection.getAdmin(hsi);
|
||||
|
||||
// list all online regions from this region server
|
||||
List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(server);
|
||||
|
|
|
@ -221,18 +221,48 @@ to ensure well-formedness of your document after an edit session.
|
|||
xlink:href="http://hadoop.apache.org">Hadoop</link><indexterm>
|
||||
<primary>Hadoop</primary>
|
||||
</indexterm></title>
|
||||
<note><title>Please read all of this section</title>
|
||||
<para>Please read this section to the end. Up front we
|
||||
wade through the weeds of Hadoop versions. Later we talk of what you must do in HBase
|
||||
to make it work w/ a particular Hadoop version.</para>
|
||||
</note>
|
||||
<para>Selecting a Hadoop version is critical for your HBase deployment. Below table shows some information about what versions of Hadoop are supported by various HBase versions. Based on the version of HBase, you should select the most appropriate version of Hadoop. We are not in the Hadoop distro selection business. You can use Hadoop distributions from Apache, or learn about vendor distributions of Hadoop at <link xlink:href="http://wiki.apache.org/hadoop/Distributions%20and%20Commercial%20Support"/></para>
|
||||
<para>
|
||||
<table>
|
||||
<title>Hadoop version support matrix</title>
|
||||
<tgroup cols='4' align='left' colsep='1' rowsep='1'><colspec colname='c1' align='left'/><colspec colname='c2' align='center'/><colspec colname='c3' align='center'/><colspec colname='c4' align='center'/>
|
||||
<thead>
|
||||
<row><entry> </entry><entry>HBase-0.92.x</entry><entry>HBase-0.94.x</entry><entry>HBase-0.96</entry></row>
|
||||
</thead><tbody>
|
||||
<row><entry>Hadoop-0.20.205</entry><entry>S</entry> <entry>S</entry> <entry>X</entry></row>
|
||||
<row><entry>Hadoop-0.22.x </entry><entry>S</entry> <entry>S</entry> <entry>X</entry></row>
|
||||
<row><entry>Hadoop-1.0.x </entry><entry>S</entry> <entry>S</entry> <entry>S</entry></row>
|
||||
<row><entry>Hadoop-1.1.x </entry><entry>NT</entry> <entry>S</entry> <entry>S</entry></row>
|
||||
<row><entry>Hadoop-0.23.x </entry><entry>X</entry> <entry>S</entry> <entry>NT</entry></row>
|
||||
<row><entry>Hadoop-2.x </entry><entry>X</entry> <entry>S</entry> <entry>S</entry></row>
|
||||
</tbody></tgroup></table>
|
||||
|
||||
Where
|
||||
<simplelist type='vert' columns='1'>
|
||||
<member>S = supported and tested,</member>
|
||||
<member>X = not supported,</member>
|
||||
<member>NT = it should run, but not tested enough.</member>
|
||||
</simplelist>
|
||||
</para>
|
||||
<para>
|
||||
Because HBase depends on Hadoop, it bundles an instance of the Hadoop jar under its <filename>lib</filename> directory. The bundled jar is ONLY for use in standalone mode. In distributed mode, it is <emphasis>critical</emphasis> that the version of Hadoop that is out on your cluster match what is under HBase. Replace the hadoop jar found in the HBase lib directory with the hadoop jar you are running on your cluster to avoid version mismatch issues. Make sure you replace the jar in HBase everywhere on your cluster. Hadoop version mismatch issues have various manifestations but often all looks like its hung up.
|
||||
</para>
|
||||
<section xml:id="hadoop.hbase-0.94">
|
||||
<title>Apache HBase 0.92 and 0.94</title>
|
||||
<para>HBase 0.92 and 0.94 versions can work with Hadoop versions, 0.20.205, 0.22.x, 1.0.x, and 1.1.x. HBase-0.94 can additionally work with Hadoop-0.23.x and 2.x, but you may have to recompile the code using the specific maven profile (see top level pom.xml)</para>
|
||||
</section>
|
||||
|
||||
<section xml:id="hadoop.hbase-0.96">
|
||||
<title>Apache HBase 0.96</title>
|
||||
<para>Apache HBase 0.96.0 requires Apache Hadoop 1.x at a minimum, and it can run equally well on hadoop-2.0.
|
||||
As of Apache HBase 0.96.x, Apache Hadoop 1.0.x at least is required. We will no longer run properly on older Hadoops such as 0.20.205 or branch-0.20-append. Do not move to Apache HBase 0.96.x if you cannot upgrade your Hadoop<footnote><para>See <link xlink:href="http://search-hadoop.com/m/7vFVx4EsUb2">HBase, mail # dev - DISCUSS: Have hbase require at least hadoop 1.0.0 in hbase 0.96.0?</link></para></footnote>.</para>
|
||||
</section>
|
||||
|
||||
<section xml:id="hadoop.older.versions">
|
||||
<title>Hadoop versions 0.20.x - 1.x</title>
|
||||
<para>
|
||||
HBase will lose data unless it is running on an HDFS that has a durable
|
||||
<code>sync</code> implementation. Hadoop 0.20.2, Hadoop 0.20.203.0, and Hadoop 0.20.204.0
|
||||
DO NOT have this attribute.
|
||||
Currently only Hadoop versions 0.20.205.x or any release in excess of this
|
||||
version -- this includes hadoop 1.0.0 -- have a working, durable sync
|
||||
<code>sync</code> implementation. DO NOT use Hadoop 0.20.2, Hadoop 0.20.203.0, and Hadoop 0.20.204.0 which DO NOT have this attribute. Currently only Hadoop versions 0.20.205.x or any release in excess of this version -- this includes hadoop-1.0.0 -- have a working, durable sync
|
||||
<footnote>
|
||||
<para>The Cloudera blog post <link xlink:href="http://www.cloudera.com/blog/2012/01/an-update-on-apache-hadoop-1-0/">An update on Apache Hadoop 1.0</link>
|
||||
by Charles Zedlweski has a nice exposition on how all the Hadoop versions relate.
|
||||
|
@ -252,73 +282,13 @@ to ensure well-formedness of your document after an edit session.
|
|||
</programlisting>
|
||||
You will have to restart your cluster after making this edit. Ignore the chicken-little
|
||||
comment you'll find in the <filename>hdfs-default.xml</filename> in the
|
||||
description for the <varname>dfs.support.append</varname> configuration; it says it is not enabled because there
|
||||
are <quote>... bugs in the 'append code' and is not supported in any production
|
||||
cluster.</quote>. This comment is stale, from another era, and while I'm sure there
|
||||
are bugs, the sync/append code has been running
|
||||
in production at large scale deploys and is on
|
||||
by default in the offerings of hadoop by commercial vendors
|
||||
<footnote><para>Until recently only the
|
||||
<link xlink:href="http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/">branch-0.20-append</link>
|
||||
branch had a working sync but no official release was ever made from this branch.
|
||||
You had to build it yourself. Michael Noll wrote a detailed blog,
|
||||
<link xlink:href="http://www.michael-noll.com/blog/2011/04/14/building-an-hadoop-0-20-x-version-for-hbase-0-90-2/">Building
|
||||
an Hadoop 0.20.x version for Apache HBase 0.90.2</link>, on how to build an
|
||||
Hadoop from branch-0.20-append. Recommended.</para></footnote>
|
||||
<footnote><para>Praveen Kumar has written
|
||||
a complimentary article,
|
||||
<link xlink:href="http://praveen.kumar.in/2011/06/20/building-hadoop-and-hbase-for-hbase-maven-application-development/">Building Hadoop and HBase for HBase Maven application development</link>.
|
||||
</para></footnote><footnote>Cloudera have <varname>dfs.support.append</varname> set to true by default.</footnote>.
|
||||
Please use the most up-to-date Hadoop possible.</para>
|
||||
<note><title>Apache HBase 0.96.0 requires Apache Hadoop 1.0.0 at a minimum</title>
|
||||
<para>As of Apache HBase 0.96.x, Apache Hadoop 1.0.x at least is required. We will no
|
||||
longer run properly on older Hadoops such as <filename>0.20.205</filename> or <filename>branch-0.20-append</filename>.
|
||||
Do not move to Apache HBase 0.96.x if you cannot upgrade your Hadoop<footnote><para>See <link xlink:href="http://search-hadoop.com/m/7vFVx4EsUb2">HBase, mail # dev - DISCUSS: Have hbase require at least hadoop 1.0.0 in hbase 0.96.0?</link></para></footnote>.</para>
|
||||
<para>Apache HBase 0.96.0 runs on Apache Hadoop 2.0.
|
||||
description for the <varname>dfs.support.append</varname> configuration.
|
||||
</para>
|
||||
</note>
|
||||
|
||||
<para>Or use the
|
||||
<link xlink:href="http://www.cloudera.com/">Cloudera</link> or
|
||||
<link xlink:href="http://www.mapr.com/">MapR</link> distributions.
|
||||
Cloudera' <link xlink:href="http://archive.cloudera.com/docs/">CDH3</link>
|
||||
is Apache Hadoop 0.20.x plus patches including all of the
|
||||
<link xlink:href="http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/">branch-0.20-append</link>
|
||||
additions needed to add a durable sync. Use the released, most recent version of CDH3. In CDH, append
|
||||
support is enabled by default so you do not need to make the above mentioned edits to
|
||||
<filename>hdfs-site.xml</filename> or to <filename>hbase-site.xml</filename>.</para>
|
||||
<para>
|
||||
<link xlink:href="http://www.mapr.com/">MapR</link>
|
||||
includes a commercial, reimplementation of HDFS.
|
||||
It has a durable sync as well as some other interesting features that are not
|
||||
yet in Apache Hadoop. Their <link xlink:href="http://www.mapr.com/products/mapr-editions/m3-edition">M3</link>
|
||||
product is free to use and unlimited.
|
||||
</para>
|
||||
|
||||
<para>Because HBase depends on Hadoop, it bundles an instance of the
|
||||
Hadoop jar under its <filename>lib</filename> directory. The bundled jar is ONLY for use in standalone mode.
|
||||
In distributed mode, it is <emphasis>critical</emphasis> that the version of Hadoop that is out
|
||||
on your cluster match what is under HBase. Replace the hadoop jar found in the HBase
|
||||
<filename>lib</filename> directory with the hadoop jar you are running on
|
||||
your cluster to avoid version mismatch issues. Make sure you
|
||||
replace the jar in HBase everywhere on your cluster. Hadoop version
|
||||
mismatch issues have various manifestations but often all looks like
|
||||
its hung up.</para>
|
||||
<note xml:id="bigtop"><title>Packaging and Apache BigTop</title>
|
||||
<para><link xlink:href="http://bigtop.apache.org">Apache Bigtop</link>
|
||||
is an umbrella for packaging and tests of the Apache Hadoop
|
||||
ecosystem, including Apache HBase. Bigtop performs testing at various
|
||||
levels (packaging, platform, runtime, upgrade, etc...), developed by a
|
||||
community, with a focus on the system as a whole, rather than individual
|
||||
projects. We recommend installing Apache HBase packages as provided by a
|
||||
Bigtop release rather than rolling your own piecemeal integration of
|
||||
various component releases.</para>
|
||||
</note>
|
||||
|
||||
</section>
|
||||
<section xml:id="hadoop.security">
|
||||
<title>Apache HBase on Secure Hadoop</title>
|
||||
<para>Apache HBase will run on any Hadoop 0.20.x that incorporates Hadoop
|
||||
security features -- e.g. Y! 0.20S or CDH3B3 -- as long as you do as
|
||||
security features as long as you do as
|
||||
suggested above and replace the Hadoop jar that ships with HBase
|
||||
with the secure version. If you want to read more about how to setup
|
||||
Secure HBase, see <xref linkend="hbase.secure.configuration" />.</para>
|
||||
|
|
Loading…
Reference in New Issue