Compare commits

...

5 Commits

6 changed files with 1195 additions and 920 deletions

View File

@ -770,11 +770,13 @@ public class SequenceFile {
return true;
}
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("size: ").append(this.theMetadata.size()).append("\n");
@ -952,6 +954,13 @@ public class SequenceFile {
}
}
/** flush all currently written data to the file system */
public void syncFs() throws IOException {
if (out != null) {
out.sync(); // flush contents to file system
}
}
/** Returns the configuration of this file. */
Configuration getConf() { return conf; }
@ -1115,10 +1124,13 @@ public class SequenceFile {
}
@Override
boolean isCompressed() { return true; }
@Override
boolean isBlockCompressed() { return false; }
/** Append a key/value pair. */
@Override
@SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
@ -1151,6 +1163,7 @@ public class SequenceFile {
}
/** Append a key/value pair. */
@Override
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
@ -1240,7 +1253,9 @@ public class SequenceFile {
finalizeFileHeader();
}
@Override
boolean isCompressed() { return true; }
@Override
boolean isBlockCompressed() { return true; }
/** Initialize */
@ -1268,6 +1283,7 @@ public class SequenceFile {
}
/** Compress and flush contents to dfs */
@Override
public synchronized void sync() throws IOException {
if (noBufferedRecords > 0) {
super.sync();
@ -1297,6 +1313,7 @@ public class SequenceFile {
}
/** Close the file. */
@Override
public synchronized void close() throws IOException {
if (out != null) {
sync();
@ -1305,6 +1322,7 @@ public class SequenceFile {
}
/** Append a key/value pair. */
@Override
@SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
@ -1337,6 +1355,7 @@ public class SequenceFile {
}
/** Append a key/value pair. */
@Override
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
@ -1929,6 +1948,7 @@ public class SequenceFile {
* of the value may be computed by calling buffer.getLength() before and
* after calls to this method. */
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
@Deprecated
public synchronized int next(DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
if (blockCompressed) {
@ -2210,6 +2230,7 @@ public class SequenceFile {
}
/** Returns the name of the file. */
@Override
public String toString() {
return file.toString();
}
@ -2798,6 +2819,7 @@ public class SequenceFile {
this.tmpDir = tmpDir;
this.progress = progress;
}
@Override
protected boolean lessThan(Object a, Object b) {
// indicate we're making progress
if (progress != null) {
@ -2933,7 +2955,7 @@ public class SequenceFile {
totalBytes += segmentsToMerge.get(i).segmentLength;
}
if (totalBytes != 0) //being paranoid
progPerByte = 1.0f / (float)totalBytes;
progPerByte = 1.0f / totalBytes;
//reset factor to what it originally was
factor = origFactor;
return this;
@ -3055,6 +3077,7 @@ public class SequenceFile {
compareTo(that.segmentPathName.toString());
}
@Override
public boolean equals(Object o) {
if (!(o instanceof SegmentDescriptor)) {
return false;
@ -3069,6 +3092,7 @@ public class SequenceFile {
return false;
}
@Override
public int hashCode() {
return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
}
@ -3159,6 +3183,7 @@ public class SequenceFile {
/** The default cleanup. Subclasses can override this with a custom
* cleanup
*/
@Override
public void cleanup() throws IOException {
super.close();
if (super.shouldPreserveInput()) return;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,114 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.EOFException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hbase.io.SequenceFile;
/** Tries to read the file created by Writer */
public class Reader {
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (args.length < 1 || args.length > 2) {
System.err.println("usage: Reader expected-number-of-records [ -n ]");
System.err.println(" where -n = do not try to recover lease");
return;
}
int expected = Integer.valueOf(args[0]);
boolean recover = true;
if (args.length == 2 && args[1].compareTo("-n") == 0) {
recover = false;
}
Configuration conf = new Configuration();
Path dir = new Path(conf.get("fs.default.name"), "log");
FileSystem fs = FileSystem.get(conf);
if (!(fs instanceof DistributedFileSystem)) {
throw new IOException("Wrong file system: " + fs.getClass().getName());
}
if (recover) {
waitForLeaseRecovery(fs, new Path(dir, "log"));
}
SequenceFile.Reader in = null;
try {
in = new SequenceFile.Reader(fs, new Path(dir, "log"), conf);
} catch (EOFException e) {
if (expected != 0) {
e.printStackTrace();
}
return;
}
IntWritable key = new IntWritable();
BytesWritable value = new BytesWritable();
int count = 0;
IOException ex = null;
try {
while (in.next(key, value)) {
count++;
}
} catch (IOException e) {
ex = e;
}
if (expected != count) {
System.err.println("Read " + count + " lines, expected " + expected +
" lines");
}
in.close();
if (ex != null) {
ex.printStackTrace();
}
}
static void waitForLeaseRecovery(FileSystem fs, Path file) {
boolean done = false;
while (!done) {
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
System.out.println("Sleep interrupted.");
}
try {
FSDataOutputStream out = fs.append(file);
out.close();
done = true;
} catch (IOException e) {
System.out.println("Triggering lease recovery if needed.");
}
}
System.out.println("Lease Recovery Successful");
}
}

View File

@ -0,0 +1,93 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.hbase.io.SequenceFile;
/** Writes to a Sequence file and then commits suicide */
public class Writer {
private static byte[] bytes = new byte[1020];
private static BytesWritable value;
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (args.length != 3) {
System.err.println("usage: Writer total-writes writes-per-sync block-size-mb");
return;
}
long blocksize = Long.valueOf(args[2]);
if (blocksize != 1L && blocksize != 64L) {
System.err.println("Only 1MB and 64MB blocksizes are allowed");
return;
}
blocksize *= 1024L * 1024L;
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte)(i % 64);
}
value = new BytesWritable(bytes);
Configuration conf = new Configuration();
Path dir = new Path(conf.get("fs.default.name"), "log");
conf.set("fs.default.name", dir.toString());
FileSystem fs = FileSystem.get(conf);
if (!(fs instanceof DistributedFileSystem)) {
throw new IOException("Wrong file system: " + fs.getClass().getName());
}
fs.mkdirs(dir);
Path file = new Path(dir, "log");
if (fs.exists(file)) {
fs.delete(file, false);
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file,
IntWritable.class, BytesWritable.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), blocksize,
SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
new SequenceFile.Metadata());
int totalWrites = Integer.valueOf(args[0]);
int writesPerSync = Integer.valueOf(args[1]);
for (int i = 1; i <= totalWrites; i++) {
writer.append(new IntWritable(i), value);
if (i % writesPerSync == 0) {
writer.syncFs();
}
}
// The following *should* prevent hdfs shutdown hook from running
Runtime.getRuntime().halt(-1);
}
}