diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 39475c193aa..2a80119b0f9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -71,6 +71,9 @@ Trunk (Unreleased) MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing. (Srikanth Sundarrajan via amareshwari) + MAPREDUCE-5197. Add a service for checkpointing task state. + (Carlo Curino via cdouglas) + BUG FIXES MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointID.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointID.java new file mode 100644 index 00000000000..4e3c3d66fa6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointID.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import org.apache.hadoop.io.Writable; + +/** + * This class represent the identified (memento) for a checkpoint. It is allowed + * to contain small amount of metadata about a checkpoint and must provide + * sufficient information to the corresponding CheckpointService to locate and + * retrieve the data contained in the checkpoint. + */ +public interface CheckpointID extends Writable { + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointNamingService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointNamingService.java new file mode 100644 index 00000000000..0bb99a49bf2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointNamingService.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +/** + * This class represent a naming service for checkpoints. + */ +public interface CheckpointNamingService { + + /** + * Generate a new checkpoint Name + * @return the checkpoint name + */ + public String getNewName(); + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointService.java new file mode 100644 index 00000000000..7fc4d689ff3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointService.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import java.io.IOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +/** + * The CheckpointService provides a simple API to store and retrieve the state of a task. + * + * Checkpoints are atomic, single-writer, write-once, multiple-readers, + * ready-many type of objects. This is provided by releasing the CheckpointID + * for a checkpoint only upon commit of the checkpoint, and by preventing a + * checkpoint to be re-opened for writes. + * + * Non-functional properties such as durability, availability, compression, + * garbage collection, quotas are left to the implementation. + * + * This API is envisioned as the basic building block for a checkpoint service, + * on top of which richer interfaces can be layered (e.g., frameworks providing + * object-serialization, checkpoint metadata and provenance, etc.) + * + */ +public interface CheckpointService { + + public interface CheckpointWriteChannel extends WritableByteChannel { } + public interface CheckpointReadChannel extends ReadableByteChannel { } + + /** + * This method creates a checkpoint and provide a channel to write to it. The + * name/location of the checkpoint are unknown to the user as of this time, in + * fact, the CheckpointID is not released to the user until commit is called. + * This makes enforcing atomicity of writes easy. + * @return a channel that can be used to write to the checkpoint + * @throws IOException + * @throws InterruptedException + */ + public CheckpointWriteChannel create() + throws IOException, InterruptedException; + + /** + * Used to finalize and existing checkpoint. It returns the CheckpointID that + * can be later used to access (read-only) this checkpoint. This guarantees + * atomicity of the checkpoint. + * @param ch the CheckpointWriteChannel to commit + * @return a CheckpointID + * @throws IOException + * @throws InterruptedException + */ + public CheckpointID commit(CheckpointWriteChannel ch) + throws IOException, InterruptedException; + + /** + * Dual to commit, it aborts the current checkpoint. Garbage collection + * choices are left to the implementation. The CheckpointID is not generated + * nor released to the user so the checkpoint is not accessible. + * @param ch the CheckpointWriteChannel to abort + * @throws IOException + * @throws InterruptedException + */ + public void abort(CheckpointWriteChannel ch) + throws IOException, InterruptedException; + + /** + * Given a CheckpointID returns a reading channel. + * @param id CheckpointID for the checkpoint to be opened + * @return a CheckpointReadChannel + * @throws IOException + * @throws InterruptedException + */ + public CheckpointReadChannel open(CheckpointID id) + throws IOException, InterruptedException; + + /** + * It discards an existing checkpoint identified by its CheckpointID. + * @param id CheckpointID for the checkpoint to be deleted + * @return a boolean confirming success of the deletion + * @throws IOException + * @throws InterruptedException + */ + public boolean delete(CheckpointID id) + throws IOException, InterruptedException; + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointID.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointID.java new file mode 100644 index 00000000000..196146c0035 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointID.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +/** + * A FileSystem based checkpoint ID contains reference to the Path + * where the checkpoint has been saved. + */ +public class FSCheckpointID implements CheckpointID { + + private Path path; + + public FSCheckpointID(){ + } + + public FSCheckpointID(Path path) { + this.path = path; + } + + public Path getPath() { + return path; + } + + @Override + public String toString() { + return path.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, path.toString()); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.path = new Path(Text.readString(in)); + } + + @Override + public boolean equals(Object other) { + return other instanceof FSCheckpointID + && path.equals(((FSCheckpointID)other).path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointService.java new file mode 100644 index 00000000000..18a92561ac4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointService.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * A FileSystem based CheckpointService. + */ +public class FSCheckpointService implements CheckpointService { + + private final Path base; + private final FileSystem fs; + private final CheckpointNamingService namingPolicy; + private final short replication; + + public FSCheckpointService(FileSystem fs, Path base, + CheckpointNamingService namingPolicy, short replication) { + this.fs = fs; + this.base = base; + this.namingPolicy = namingPolicy; + this.replication = replication; + } + + public CheckpointWriteChannel create() + throws IOException { + + String name = namingPolicy.getNewName(); + + Path p = new Path(name); + if (p.isUriPathAbsolute()) { + throw new IOException("Checkpoint cannot be an absolute path"); + } + return createInternal(new Path(base, p)); + } + + CheckpointWriteChannel createInternal(Path name) throws IOException { + + //create a temp file, fail if file exists + return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), + replication)); + } + + private static class FSCheckpointWriteChannel + implements CheckpointWriteChannel { + private boolean isOpen = true; + private final Path finalDst; + private final WritableByteChannel out; + + FSCheckpointWriteChannel(Path finalDst, FSDataOutputStream out) { + this.finalDst = finalDst; + this.out = Channels.newChannel(out); + } + + public int write(ByteBuffer b) throws IOException { + return out.write(b); + } + + public Path getDestination() { + return finalDst; + } + + @Override + public void close() throws IOException { + isOpen=false; + out.close(); + } + + @Override + public boolean isOpen() { + return isOpen; + } + + } + + @Override + public CheckpointReadChannel open(CheckpointID id) + throws IOException, InterruptedException { + if (!(id instanceof FSCheckpointID)) { + throw new IllegalArgumentException( + "Mismatched checkpoint type: " + id.getClass()); + } + return new FSCheckpointReadChannel( + fs.open(((FSCheckpointID) id).getPath())); + } + + private static class FSCheckpointReadChannel + implements CheckpointReadChannel { + + private boolean isOpen = true; + private final ReadableByteChannel in; + + FSCheckpointReadChannel(FSDataInputStream in){ + this.in = Channels.newChannel(in); + } + + @Override + public int read(ByteBuffer bb) throws IOException { + return in.read(bb); + } + + @Override + public void close() throws IOException { + isOpen = false; + in.close(); + } + + @Override + public boolean isOpen() { + return isOpen; + } + + } + + @Override + public CheckpointID commit(CheckpointWriteChannel ch) + throws IOException, InterruptedException { + if (ch.isOpen()) { + ch.close(); + } + FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel)ch; + Path dst = hch.getDestination(); + if (!fs.rename(tmpfile(dst), dst)) { + // attempt to clean up + abort(ch); + throw new IOException("Failed to promote checkpoint" + + tmpfile(dst) + " -> " + dst); + } + return new FSCheckpointID(hch.getDestination()); + } + + @Override + public void abort(CheckpointWriteChannel ch) throws IOException { + if (ch.isOpen()) { + ch.close(); + } + FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel)ch; + Path tmp = tmpfile(hch.getDestination()); + try { + if (!fs.delete(tmp, false)) { + throw new IOException("Failed to delete checkpoint during abort"); + } + } catch (FileNotFoundException e) { + // IGNORE + } + } + + @Override + public boolean delete(CheckpointID id) throws IOException, + InterruptedException { + if (!(id instanceof FSCheckpointID)) { + throw new IllegalArgumentException( + "Mismatched checkpoint type: " + id.getClass()); + } + Path tmp = ((FSCheckpointID)id).getPath(); + try { + return fs.delete(tmp, false); + } catch (FileNotFoundException e) { + // IGNORE + } + return true; + } + + static final Path tmpfile(Path p) { + return new Path(p.getParent(), p.getName() + ".tmp"); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/RandomNameCNS.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/RandomNameCNS.java new file mode 100644 index 00000000000..7387c1c4915 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/RandomNameCNS.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import org.apache.commons.lang.RandomStringUtils; + +/** + * Simple naming service that generates a random checkpoint name. + */ +public class RandomNameCNS implements CheckpointNamingService { + + @Override + public String getNewName() { + return "checkpoint_" + RandomStringUtils.randomAlphanumeric(8); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/SimpleNamingService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/SimpleNamingService.java new file mode 100644 index 00000000000..85630ad5309 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/SimpleNamingService.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +/** + * A naming service that simply returns the name it has been initialized with. + */ +public class SimpleNamingService implements CheckpointNamingService{ + + final String name; + + public SimpleNamingService(String name){ + this.name = name; + } + + /** + * Generate a new checkpoint Name + * @return the checkpoint name + */ + public String getNewName(){ + return "checkpoint_" + name; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointID.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointID.java new file mode 100644 index 00000000000..58abb32a28b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointID.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Test; + +public class TestFSCheckpointID { + + @Test + public void testFSCheckpointIDSerialization() throws IOException { + + Path inpath = new Path("/tmp/blah"); + FSCheckpointID cidin = new FSCheckpointID(inpath); + DataOutputBuffer out = new DataOutputBuffer(); + cidin.write(out); + out.close(); + + FSCheckpointID cidout = new FSCheckpointID(null); + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), 0, out.getLength()); + cidout.readFields(in); + in.close(); + + assert cidin.equals(cidout); + + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointService.java new file mode 100644 index 00000000000..d60c908b15a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointService.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import java.nio.ByteBuffer; + +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapreduce.checkpoint.CheckpointService.CheckpointWriteChannel; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; +import org.mockito.*; + +public class TestFSCheckpointService { + + private final int BUFSIZE = 1024; + + @Test + public void testCheckpointCreate() throws Exception { + checkpointCreate(ByteBuffer.allocate(BUFSIZE)); + } + + @Test + public void testCheckpointCreateDirect() throws Exception { + checkpointCreate(ByteBuffer.allocateDirect(BUFSIZE)); + } + + public void checkpointCreate(ByteBuffer b) throws Exception { + int WRITES = 128; + FileSystem fs = mock(FileSystem.class); + DataOutputBuffer dob = new DataOutputBuffer(); + FSDataOutputStream hdfs = spy(new FSDataOutputStream(dob, null)); + @SuppressWarnings("resource") // backed by array + DataOutputBuffer verif = new DataOutputBuffer(); + when(fs.create(isA(Path.class), eq((short)1))).thenReturn(hdfs); + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + + Path base = new Path("/chk"); + Path finalLoc = new Path("/chk/checkpoint_chk0"); + Path tmp = FSCheckpointService.tmpfile(finalLoc); + + FSCheckpointService chk = new FSCheckpointService(fs, base, + new SimpleNamingService("chk0"), (short) 1); + CheckpointWriteChannel out = chk.create(); + + Random r = new Random(); + + final byte[] randBytes = new byte[BUFSIZE]; + for (int i = 0; i < WRITES; ++i) { + r.nextBytes(randBytes); + int s = r.nextInt(BUFSIZE - 1); + int e = r.nextInt(BUFSIZE - s) + 1; + verif.write(randBytes, s, e); + b.clear(); + b.put(randBytes).flip(); + b.position(s).limit(b.position() + e); + out.write(b); + } + verify(fs, never()).rename(any(Path.class), eq(finalLoc)); + CheckpointID cid = chk.commit(out); + verify(hdfs).close(); + verify(fs).rename(eq(tmp), eq(finalLoc)); + + assertArrayEquals(Arrays.copyOfRange(verif.getData(), 0, verif.getLength()), + Arrays.copyOfRange(dob.getData(), 0, dob.getLength())); + } + + @Test + public void testDelete() throws Exception { + FileSystem fs = mock(FileSystem.class); + Path chkloc = new Path("/chk/chk0"); + when(fs.delete(eq(chkloc), eq(false))).thenReturn(true); + Path base = new Path("/otherchk"); + FSCheckpointID id = new FSCheckpointID(chkloc); + FSCheckpointService chk = new FSCheckpointService(fs, base, + new SimpleNamingService("chk0"), (short) 1); + assertTrue(chk.delete(id)); + verify(fs).delete(eq(chkloc), eq(false)); + } + +}