HBASE-623 migration script for hbase-82

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@658016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-05-19 22:46:50 +00:00
parent db289cb669
commit 274ed7384b
15 changed files with 474 additions and 124 deletions

View File

@ -15,6 +15,7 @@ Hbase Change Log
HBASE-624 Master will shut down if number of active region servers is zero HBASE-624 Master will shut down if number of active region servers is zero
even if shutdown was not requested even if shutdown was not requested
HBASE-629 Split reports incorrect elapsed time HBASE-629 Split reports incorrect elapsed time
HBASE-623 Migration script for hbase-82
IMPROVEMENTS IMPROVEMENTS
HBASE-559 MR example job to count table rows HBASE-559 MR example job to count table rows

View File

@ -30,6 +30,7 @@
<property name="src.dir" location="${basedir}/src/java"/> <property name="src.dir" location="${basedir}/src/java"/>
<property name="src.test" location="${basedir}/src/test"/> <property name="src.test" location="${basedir}/src/test"/>
<property name="src.testdata" location="${basedir}/src/testdata"/>
<property name="src.examples" location="${basedir}/src/examples"/> <property name="src.examples" location="${basedir}/src/examples"/>
<property name="src.webapps" location="${basedir}/src/webapps"/> <property name="src.webapps" location="${basedir}/src/webapps"/>
@ -379,6 +380,7 @@
<sysproperty key="test.build.data" value="${build.test}/data"/> <sysproperty key="test.build.data" value="${build.test}/data"/>
<sysproperty key="build.test" value="${build.test}"/> <sysproperty key="build.test" value="${build.test}"/>
<sysproperty key="src.testdata" value="${src.testdata}"/>
<sysproperty key="contrib.name" value="${name}"/> <sysproperty key="contrib.name" value="${name}"/>
<sysproperty key="user.dir" value="${build.test}/data"/> <sysproperty key="user.dir" value="${build.test}/data"/>

View File

@ -335,6 +335,9 @@ public class HColumnDescriptor implements WritableComparable {
Text t = new Text(); Text t = new Text();
t.readFields(in); t.readFields(in);
this.name = t.getBytes(); this.name = t.getBytes();
if (HStoreKey.getFamilyDelimiterIndex(this.name) > 0) {
this.name = stripColon(this.name);
}
} else { } else {
this.name = Bytes.readByteArray(in); this.name = Bytes.readByteArray(in);
} }

View File

@ -202,4 +202,8 @@ public interface HConstants {
* Unlimited time-to-live. * Unlimited time-to-live.
*/ */
static final int FOREVER = -1; static final int FOREVER = -1;
}
public static final String HBASE_CLIENT_RETRIES_NUMBER_KEY =
"hbase.client.retries.number";
public static final int DEFAULT_CLIENT_RETRIES = 5;
}

View File

@ -157,19 +157,16 @@ public class HConnectionManager implements HConstants {
this.masterChecked = false; this.masterChecked = false;
this.servers = new ConcurrentHashMap<String, HRegionInterface>(); this.servers = new ConcurrentHashMap<String, HRegionInterface>();
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
public HMasterInterface getMaster() throws MasterNotRunningException { public HMasterInterface getMaster() throws MasterNotRunningException {
HServerAddress masterLocation = null;
synchronized (this.masterLock) { synchronized (this.masterLock) {
for (int tries = 0; for (int tries = 0; !this.closed &&
!this.closed && !this.masterChecked && this.master == null && tries < numRetries;
!this.masterChecked && this.master == null && tries++) {
tries < numRetries; String m = this.conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS);
tries++) { masterLocation = new HServerAddress(m);
HServerAddress masterLocation = new HServerAddress(this.conf.get(
MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS));
try { try {
HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy( HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy(
HMasterInterface.class, HMasterInterface.versionID, HMasterInterface.class, HMasterInterface.versionID,
@ -181,12 +178,12 @@ public class HConnectionManager implements HConstants {
} }
} catch (IOException e) { } catch (IOException e) {
if(tries == numRetries - 1) { if (tries == numRetries - 1) {
// This was our last chance - don't bother sleeping // This was our last chance - don't bother sleeping
break; break;
} }
LOG.info("Attempt " + tries + " of " + this.numRetries + LOG.info("Attempt " + tries + " of " + this.numRetries +
" failed with <" + e + ">. Retrying after sleep of " + this.pause); " failed with <" + e + ">. Retrying after sleep of " + this.pause);
} }
// We either cannot connect to master or it is not running. Sleep & retry // We either cannot connect to master or it is not running. Sleep & retry
@ -200,7 +197,8 @@ public class HConnectionManager implements HConstants {
this.masterChecked = true; this.masterChecked = true;
} }
if (this.master == null) { if (this.master == null) {
throw new MasterNotRunningException(); throw new MasterNotRunningException(masterLocation == null? "":
masterLocation.toString());
} }
return this.master; return this.master;
} }
@ -323,12 +321,11 @@ public class HConnectionManager implements HConstants {
// This block guards against two threads trying to find the root // This block guards against two threads trying to find the root
// region at the same time. One will go do the find while the // region at the same time. One will go do the find while the
// second waits. The second thread will not do find. // second waits. The second thread will not do find.
if (!useCache || rootRegionLocation == null) { if (!useCache || rootRegionLocation == null) {
return locateRootRegion(); return locateRootRegion();
} }
return rootRegionLocation; return rootRegionLocation;
} }
} else if (Bytes.equals(tableName, META_TABLE_NAME)) { } else if (Bytes.equals(tableName, META_TABLE_NAME)) {
synchronized (metaRegionLock) { synchronized (metaRegionLock) {
// This block guards against two threads trying to load the meta // This block guards against two threads trying to load the meta
@ -655,30 +652,29 @@ public class HConnectionManager implements HConstants {
*/ */
private HRegionLocation locateRootRegion() private HRegionLocation locateRootRegion()
throws IOException { throws IOException {
getMaster(); getMaster();
HServerAddress rootRegionAddress = null; HServerAddress rootRegionAddress = null;
for (int tries = 0; tries < numRetries; tries++) { for (int tries = 0; tries < numRetries; tries++) {
int localTimeouts = 0; int localTimeouts = 0;
// Ask the master which server has the root region
// ask the master which server has the root region
while (rootRegionAddress == null && localTimeouts < numRetries) { while (rootRegionAddress == null && localTimeouts < numRetries) {
rootRegionAddress = master.findRootRegion(); rootRegionAddress = master.findRootRegion();
if (rootRegionAddress == null) { if (rootRegionAddress == null) {
try { // Increment and then only sleep if retries left.
if (LOG.isDebugEnabled()) { if (++localTimeouts < numRetries) {
LOG.debug("Sleeping. Waiting for root region."); try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping " + pause + "ms. Waiting for root "
+ "region. Attempt " + tries + " of " + numRetries);
}
Thread.sleep(pause);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch (InterruptedException iex) {
// continue
} }
Thread.sleep(pause);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch (InterruptedException iex) {
// continue
} }
localTimeouts++;
} }
} }

View File

@ -525,6 +525,23 @@ public class HTable implements HConstants {
); );
} }
/**
* Get a scanner on the current table starting at first row.
* Return the specified columns.
*
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible
* to pass a regex in the column qualifier. A column qualifier is judged to
* be a regex if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @return scanner
* @throws IOException
*/
public Scanner getScanner(final Text [] columns)
throws IOException {
return getScanner(Bytes.toByteArrays(columns), HConstants.EMPTY_START_ROW);
}
/** /**
* Get a scanner on the current table starting at the specified row. * Get a scanner on the current table starting at the specified row.
* Return the specified columns. * Return the specified columns.
@ -542,7 +559,25 @@ public class HTable implements HConstants {
throws IOException { throws IOException {
return getScanner(Bytes.toByteArrays(columns), startRow.getBytes()); return getScanner(Bytes.toByteArrays(columns), startRow.getBytes());
} }
/**
* Get a scanner on the current table starting at first row.
* Return the specified columns.
*
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible
* to pass a regex in the column qualifier. A column qualifier is judged to
* be a regex if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @return scanner
* @throws IOException
*/
public Scanner getScanner(final byte[][] columns)
throws IOException {
return getScanner(columns, HConstants.EMPTY_START_ROW,
HConstants.LATEST_TIMESTAMP, null);
}
/** /**
* Get a scanner on the current table starting at the specified row. * Get a scanner on the current table starting at the specified row.
* Return the specified columns. * Return the specified columns.
@ -875,7 +910,7 @@ public class HTable implements HConstants {
throws IOException { throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Creating scanner over " + Bytes.toString(tableName) + LOG.debug("Creating scanner over " + Bytes.toString(tableName) +
" starting at key '" + startRow + "'"); " starting at key '" + Bytes.toString(startRow) + "'");
} }
// save off the simple parameters // save off the simple parameters
this.columns = columns; this.columns = columns;
@ -921,7 +956,8 @@ public class HTable implements HConstants {
byte [] localStartKey = oldRegion == null? startRow: oldRegion.getEndKey(); byte [] localStartKey = oldRegion == null? startRow: oldRegion.getEndKey();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Advancing internal scanner to startKey " + localStartKey); LOG.debug("Advancing internal scanner to startKey at " +
Bytes.toString(localStartKey));
} }
try { try {

View File

@ -277,7 +277,7 @@ abstract class BaseScanner extends Chore implements HConstants {
result = true; result = true;
} else if (LOG.isDebugEnabled()) { } else if (LOG.isDebugEnabled()) {
// If debug, note we checked and current state of daughters. // If debug, note we checked and current state of daughters.
LOG.debug("Checked " + parent.getRegionName() + LOG.debug("Checked " + parent.getRegionNameAsString() +
" for references: splitA: " + hasReferencesA + ", splitB: "+ " for references: splitA: " + hasReferencesA + ", splitB: "+
hasReferencesB); hasReferencesB);
} }

View File

@ -943,7 +943,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
for(HRegion region: regionsToClose) { for(HRegion region: regionsToClose) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("closing region " + region.getRegionName()); LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
} }
try { try {
region.close(); region.close();

View File

@ -384,7 +384,7 @@ public class HStore implements HConstants {
HStoreFile curfile = null; HStoreFile curfile = null;
HStoreFile.Reference reference = null; HStoreFile.Reference reference = null;
if (isReference) { if (isReference) {
reference = readSplitInfo(p, fs); reference = HStoreFile.readSplitInfo(p, fs);
} }
curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(),
family.getName(), fid, reference); family.getName(), fid, reference);
@ -1773,21 +1773,6 @@ public class HStore implements HConstants {
return this.storeNameStr; return this.storeNameStr;
} }
/*
* @see writeSplitInfo(Path p, HStoreFile hsf, FileSystem fs)
*/
static HStoreFile.Reference readSplitInfo(final Path p, final FileSystem fs)
throws IOException {
FSDataInputStream in = fs.open(p);
try {
HStoreFile.Reference r = new HStoreFile.Reference();
r.readFields(in);
return r;
} finally {
in.close();
}
}
/** /**
* @param p Path to check. * @param p Path to check.
* @return True if the path has format of a HStoreFile reference. * @return True if the path has format of a HStoreFile reference.

View File

@ -281,7 +281,22 @@ public class HStoreFile implements HConstants {
out.close(); out.close();
} }
} }
/**
* @see #writeSplitInfo(FileSystem fs)
*/
static HStoreFile.Reference readSplitInfo(final Path p, final FileSystem fs)
throws IOException {
FSDataInputStream in = fs.open(p);
try {
HStoreFile.Reference r = new HStoreFile.Reference();
r.readFields(in);
return r;
} finally {
in.close();
}
}
private void createOrFail(final FileSystem fs, final Path p) private void createOrFail(final FileSystem fs, final Path p)
throws IOException { throws IOException {
if (fs.exists(p)) { if (fs.exists(p)) {
@ -585,6 +600,12 @@ public class HStoreFile implements HConstants {
/** {@inheritDoc} */ /** {@inheritDoc} */
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
// Write out the encoded region name as a String. Doing it as a String
// keeps a Reference's serialziation backword compatible with
// pre-HBASE-82 serializations. ALternative is rewriting all
// info files in hbase (Serialized References are written into the
// 'info' file that accompanies HBase Store files).
out.writeUTF(Integer.toString(encodedRegionName));
out.writeInt(this.encodedRegionName); out.writeInt(this.encodedRegionName);
out.writeLong(fileid); out.writeLong(fileid);
// Write true if we're doing top of the file. // Write true if we're doing top of the file.
@ -594,7 +615,7 @@ public class HStoreFile implements HConstants {
/** {@inheritDoc} */ /** {@inheritDoc} */
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
this.encodedRegionName = in.readInt(); this.encodedRegionName = Integer.parseInt(in.readUTF());
fileid = in.readLong(); fileid = in.readLong();
boolean tmp = in.readBoolean(); boolean tmp = in.readBoolean();
// If true, set region to top. // If true, set region to top.

View File

@ -11,6 +11,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
public class Bytes { public class Bytes {
/** /**
@ -40,8 +41,12 @@ public class Bytes {
*/ */
public static byte [] readByteArray(final DataInput in) public static byte [] readByteArray(final DataInput in)
throws IOException { throws IOException {
byte [] result = new byte[in.readInt()]; int len = WritableUtils.readVInt(in);
in.readFully(result, 0, result.length); if (len < 0) {
throw new NegativeArraySizeException(Integer.toString(len));
}
byte [] result = new byte[len];
in.readFully(result, 0, len);
return result; return result;
} }
@ -52,7 +57,7 @@ public class Bytes {
*/ */
public static void writeByteArray(final DataOutput out, final byte [] b) public static void writeByteArray(final DataOutput out, final byte [] b)
throws IOException { throws IOException {
out.writeInt(b.length); WritableUtils.writeVInt(out, b.length);
out.write(b, 0, b.length); out.write(b, 0, b.length);
} }

View File

@ -0,0 +1,266 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import junit.framework.TestCase;
/**
* Test that individual classes can migrate themselves.
*/
public class TestClassMigration extends TestCase {
/**
* Test we can migrate a 0.1 version of HSK.
* @throws Exception
*/
public void testMigrateHStoreKey() throws Exception {
long now = System.currentTimeMillis();
byte [] nameBytes = Bytes.toBytes(getName());
Text nameText = new Text(nameBytes);
HStoreKey01Branch hsk = new HStoreKey01Branch(nameText, nameText, now);
byte [] b = Writables.getBytes(hsk);
HStoreKey deserializedHsk =
(HStoreKey)Writables.getWritable(b, new HStoreKey());
assertEquals(deserializedHsk.getTimestamp(), hsk.getTimestamp());
assertTrue(Bytes.equals(nameBytes, deserializedHsk.getColumn()));
assertTrue(Bytes.equals(nameBytes, deserializedHsk.getRow()));
}
/**
* HBase 0.1 branch HStoreKey. Same in all regards except the utility
* methods have been removed.
* Used in test of HSK migration test.
*/
private static class HStoreKey01Branch implements WritableComparable {
/**
* Colon character in UTF-8
*/
public static final char COLUMN_FAMILY_DELIMITER = ':';
private Text row;
private Text column;
private long timestamp;
/** Default constructor used in conjunction with Writable interface */
public HStoreKey01Branch() {
this(new Text());
}
/**
* Create an HStoreKey specifying only the row
* The column defaults to the empty string and the time stamp defaults to
* Long.MAX_VALUE
*
* @param row - row key
*/
public HStoreKey01Branch(Text row) {
this(row, Long.MAX_VALUE);
}
/**
* Create an HStoreKey specifying the row and timestamp
* The column name defaults to the empty string
*
* @param row row key
* @param timestamp timestamp value
*/
public HStoreKey01Branch(Text row, long timestamp) {
this(row, new Text(), timestamp);
}
/**
* Create an HStoreKey specifying the row and column names
* The timestamp defaults to LATEST_TIMESTAMP
*
* @param row row key
* @param column column key
*/
public HStoreKey01Branch(Text row, Text column) {
this(row, column, HConstants.LATEST_TIMESTAMP);
}
/**
* Create an HStoreKey specifying all the fields
*
* @param row row key
* @param column column key
* @param timestamp timestamp value
*/
public HStoreKey01Branch(Text row, Text column, long timestamp) {
// Make copies by doing 'new Text(arg)'.
this.row = new Text(row);
this.column = new Text(column);
this.timestamp = timestamp;
}
/** @return Approximate size in bytes of this key. */
public long getSize() {
return this.row.getLength() + this.column.getLength() +
8 /* There is no sizeof in java. Presume long is 8 (64bit machine)*/;
}
/**
* Constructs a new HStoreKey from another
*
* @param other the source key
*/
public HStoreKey01Branch(HStoreKey01Branch other) {
this(other.row, other.column, other.timestamp);
}
/**
* Change the value of the row key
*
* @param newrow new row key value
*/
public void setRow(Text newrow) {
this.row.set(newrow);
}
/**
* Change the value of the column key
*
* @param newcol new column key value
*/
public void setColumn(Text newcol) {
this.column.set(newcol);
}
/**
* Change the value of the timestamp field
*
* @param timestamp new timestamp value
*/
public void setVersion(long timestamp) {
this.timestamp = timestamp;
}
/**
* Set the value of this HStoreKey from the supplied key
*
* @param k key value to copy
*/
public void set(HStoreKey01Branch k) {
this.row = k.getRow();
this.column = k.getColumn();
this.timestamp = k.getTimestamp();
}
/** @return value of row key */
public Text getRow() {
return row;
}
/** @return value of column key */
public Text getColumn() {
return column;
}
/** @return value of timestamp */
public long getTimestamp() {
return timestamp;
}
/** {@inheritDoc} */
@Override
public String toString() {
return row.toString() + "/" + column.toString() + "/" + timestamp;
}
/** {@inheritDoc} */
@Override
public boolean equals(Object obj) {
return compareTo(obj) == 0;
}
/** {@inheritDoc} */
@Override
public int hashCode() {
int result = this.row.hashCode();
result ^= this.column.hashCode();
result ^= this.timestamp;
return result;
}
// Comparable
public int compareTo(Object o) {
HStoreKey01Branch other = (HStoreKey01Branch)o;
int result = this.row.compareTo(other.row);
if (result != 0) {
return result;
}
result = this.column.compareTo(other.column);
if (result != 0) {
return result;
}
// The below older timestamps sorting ahead of newer timestamps looks
// wrong but it is intentional. This way, newer timestamps are first
// found when we iterate over a memcache and newer versions are the
// first we trip over when reading from a store file.
if (this.timestamp < other.timestamp) {
result = 1;
} else if (this.timestamp > other.timestamp) {
result = -1;
}
return result;
}
// Writable
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
row.write(out);
column.write(out);
out.writeLong(timestamp);
}
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
row.readFields(in);
column.readFields(in);
timestamp = in.readLong();
}
/**
* Returns row and column bytes out of an HStoreKey.
* @param hsk Store key.
* @return byte array encoding of HStoreKey
* @throws UnsupportedEncodingException
*/
public static byte[] getBytes(final HStoreKey hsk)
throws UnsupportedEncodingException {
StringBuilder s = new StringBuilder(hsk.getRow().toString());
s.append(hsk.getColumn().toString());
return s.toString().getBytes(HConstants.UTF8_ENCODING);
}
}
}

View File

@ -19,6 +19,7 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.Cell;
@ -40,7 +41,7 @@ public class TestSerialization extends HBaseTestCase {
super.tearDown(); super.tearDown();
} }
public void testname() throws Exception { public void testHMsg() throws Exception {
HMsg m = new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE); HMsg m = new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE);
byte [] mb = Writables.getBytes(m); byte [] mb = Writables.getBytes(m);
HMsg deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg()); HMsg deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg());

View File

@ -153,7 +153,7 @@ public class TestHStoreFile extends HBaseTestCase {
assertTrue(this.fs.exists(refHsf.getMapFilePath())); assertTrue(this.fs.exists(refHsf.getMapFilePath()));
assertTrue(this.fs.exists(refHsf.getInfoFilePath())); assertTrue(this.fs.exists(refHsf.getInfoFilePath()));
HStoreFile.Reference otherReference = HStoreFile.Reference otherReference =
HStore.readSplitInfo(refHsf.getInfoFilePath(), this.fs); HStoreFile.readSplitInfo(refHsf.getInfoFilePath(), this.fs);
assertEquals(reference.getEncodedRegionName(), assertEquals(reference.getEncodedRegionName(),
otherReference.getEncodedRegionName()); otherReference.getEncodedRegionName());
assertEquals(reference.getFileId(), assertEquals(reference.getFileId(),

View File

@ -20,57 +20,50 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.log4j.Level; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.log4j.Logger; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.io.RowResult;
/** /**
* *
*/ */
public class TestMigrate extends HBaseTestCase { public class TestMigrate extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestMigrate.class); private static final Log LOG = LogFactory.getLog(TestMigrate.class);
/**
*
*/
public TestMigrate() {
super();
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger(this.getClass().getPackage().getName()).
setLevel(Level.DEBUG);
}
/** {@inheritDoc} */
@Override
protected void setUp() throws Exception {
super.setUp();
}
/** {@inheritDoc} */
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
// This is the name of the table that is in the data file.
private static final String TABLENAME = "TestUpgrade";
// The table has two columns
private static final byte [][] TABLENAME_COLUMNS =
{Bytes.toBytes("column_a:"), Bytes.toBytes("column_b:")};
// Expected count of rows in migrated table.
private static final int EXPECTED_COUNT = 17576;
/** /**
* @throws IOException
* *
*/ */
public void testUpgrade() { public void testUpgrade() throws IOException {
MiniDFSCluster dfsCluster = null; MiniDFSCluster dfsCluster = null;
try { try {
dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null); dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
@ -81,63 +74,46 @@ public class TestMigrate extends HBaseTestCase {
Path root = dfs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR))); Path root = dfs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR)));
dfs.mkdirs(root); dfs.mkdirs(root);
/*
* First load files from an old style HBase file structure
*/
// Current directory is .../project/build/test/data
FileSystem localfs = FileSystem.getLocal(conf); FileSystem localfs = FileSystem.getLocal(conf);
// Get path for zip file. If running this test in eclipse, define
// Get path for zip file // the system property src.testdata for your test run.
String srcTestdata = System.getProperty("src.testdata");
FSDataInputStream hs = localfs.open(new Path(Path.CUR_DIR, if (srcTestdata == null) {
throw new NullPointerException("Define src.test system property");
// this path is for running test with ant }
Path data = new Path(srcTestdata, "HADOOP-2478-testdata.zip");
"../../../src/testdata/HADOOP-2478-testdata.zip") if (!localfs.exists(data)) {
throw new FileNotFoundException(data.toString());
// and this path is for when you want to run inside eclipse }
FSDataInputStream hs = localfs.open(data);
/*"src/testdata/HADOOP-2478-testdata.zip")*/
);
ZipInputStream zip = new ZipInputStream(hs); ZipInputStream zip = new ZipInputStream(hs);
unzip(zip, dfs, root); unzip(zip, dfs, root);
zip.close(); zip.close();
hs.close(); hs.close();
listPaths(dfs, root, root.toString().length() + 1); listPaths(dfs, root, root.toString().length() + 1);
Migrate u = new Migrate(conf); Migrate u = new Migrate(conf);
u.run(new String[] {"check"}); u.run(new String[] {"check"});
listPaths(dfs, root, root.toString().length() + 1); listPaths(dfs, root, root.toString().length() + 1);
u = new Migrate(conf); u = new Migrate(conf);
u.run(new String[] {"upgrade"}); u.run(new String[] {"upgrade"});
listPaths(dfs, root, root.toString().length() + 1); listPaths(dfs, root, root.toString().length() + 1);
// Remove version file and try again // Remove version file and try again
dfs.delete(new Path(root, HConstants.VERSION_FILE_NAME)); dfs.delete(new Path(root, HConstants.VERSION_FILE_NAME));
u = new Migrate(conf); u = new Migrate(conf);
u.run(new String[] {"upgrade"}); u.run(new String[] {"upgrade"});
listPaths(dfs, root, root.toString().length() + 1); listPaths(dfs, root, root.toString().length() + 1);
// Try again. No upgrade should be necessary // Try again. No upgrade should be necessary
u = new Migrate(conf); u = new Migrate(conf);
u.run(new String[] {"check"}); u.run(new String[] {"check"});
u = new Migrate(conf); u = new Migrate(conf);
u.run(new String[] {"upgrade"}); u.run(new String[] {"upgrade"});
} catch (Exception e) { // Now verify that can read contents.
e.printStackTrace(); verify();
} finally { } finally {
if (dfsCluster != null) { if (dfsCluster != null) {
shutdownDfs(dfsCluster); shutdownDfs(dfsCluster);
@ -145,14 +121,68 @@ public class TestMigrate extends HBaseTestCase {
} }
} }
/*
* Verify can read the migrated table.
* @throws IOException
*/
private void verify() throws IOException {
// Delete any cached connections. Need to do this because connection was
// created earlier when no master was around. The fact that there was no
// master gets cached. Need to delete so we go get master afresh.
HConnectionManager.deleteConnection(this.conf);
LOG.info("Start a cluster against migrated FS");
// Up number of retries. Needed while cluster starts up. Its been set to 1
// above.
this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER_KEY, 3);
MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1);
try {
HBaseAdmin hb = new HBaseAdmin(this.conf);
assertTrue(hb.isMasterRunning());
HTableDescriptor [] tables = hb.listTables();
boolean foundTable = false;
for (int i = 0; i < tables.length; i++) {
if (Bytes.equals(Bytes.toBytes(TABLENAME), tables[i].getName())) {
foundTable = true;
break;
}
}
assertTrue(foundTable);
LOG.info(TABLENAME + " exists. Creating an HTable to go against " +
TABLENAME + " and master " + this.conf.get(HConstants.MASTER_ADDRESS));
HTable t = new HTable(this.conf, TABLENAME);
int count = 0;
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOG.info("OPENING SCANNER");
Scanner s = t.getScanner(TABLENAME_COLUMNS);
try {
for (RowResult r: s) {
if (r == null || r.size() == 0) {
break;
}
count++;
if (count % 1000 == 0 && count > 0) {
LOG.info("Iterated over " + count + " rows.");
}
}
assertEquals(EXPECTED_COUNT, count);
} finally {
s.close();
}
} finally {
cluster.shutdown();
}
}
private void unzip(ZipInputStream zip, FileSystem dfs, Path root) private void unzip(ZipInputStream zip, FileSystem dfs, Path root)
throws IOException { throws IOException {
ZipEntry e = null; ZipEntry e = null;
while ((e = zip.getNextEntry()) != null) { while ((e = zip.getNextEntry()) != null) {
if (e.isDirectory()) { if (e.isDirectory()) {
dfs.mkdirs(new Path(root, e.getName())); dfs.mkdirs(new Path(root, e.getName()));
} else { } else {
FSDataOutputStream out = dfs.create(new Path(root, e.getName())); FSDataOutputStream out = dfs.create(new Path(root, e.getName()));
byte[] buffer = new byte[4096]; byte[] buffer = new byte[4096];
@ -185,4 +215,4 @@ public class TestMigrate extends HBaseTestCase {
} }
} }
} }
} }