diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index 224cac17b0e..3f736d45d2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.util.VersionInfo; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -105,6 +106,25 @@ public final class FederationUtil { return ret; } + /** + * Fetch the compile timestamp for this jar. + * + * @return Date compiled. + */ + public static String getBuildVersion() { + return VersionInfo.getBuildVersion(); + } + + /** + * Fetch the build/compile information for this jar. + * + * @return String Compilation info. + */ + public static String getCompileInfo() { + return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + + VersionInfo.getBranch(); + } + /** * Create an instance of an interface with a constructor using a context. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java new file mode 100644 index 00000000000..25a64668db1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +/** + * States of the Router. + */ +public enum RouterServiceState { + NONE, + INITIALIZING, + SAFEMODE, + RUNNING, + STOPPING, + SHUTDOWN, + EXPIRED; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java new file mode 100644 index 00000000000..c6a0dad01bf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; + +/** + * Management API for + * {@link org.apache.hadoop.hdfs.server.federation.store.records.RouterState + * RouterState} records in the state store. Accesses the data store via the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver. + * StateStoreDriver StateStoreDriver} interface. No data is cached. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RouterStore extends CachedRecordStore { + + public RouterStore(StateStoreDriver driver) { + super(RouterState.class, driver, true); + } + + /** + * Fetches the current router state object. + * + * @param request Fully populated request object. + * @return The matching router record or null if none exists. + * @throws IOException Throws exception if unable to query the data store or + * if more than one matching record is found. + */ + public abstract GetRouterRegistrationResponse getRouterRegistration( + GetRouterRegistrationRequest request) throws IOException; + + /** + * Fetches all router status objects. + * + * @param request Fully populated request object. + * @return List of Router records present in the data store. + * @throws IOException Throws exception if unable to query the data store + */ + public abstract GetRouterRegistrationsResponse getRouterRegistrations( + GetRouterRegistrationsRequest request) throws IOException; + + /** + * Update the state of this router in the State Store. + * + * @param request Fully populated request object. + * @return True if the update was successfully recorded, false otherwise. + * @throws IOException Throws exception if unable to query the data store + */ + public abstract RouterHeartbeatResponse routerHeartbeat( + RouterHeartbeatRequest request) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java new file mode 100644 index 00000000000..d58c2881cbd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; + +/** + * Implementation of the {@link RouterStore} state store API. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RouterStoreImpl extends RouterStore { + + public RouterStoreImpl(StateStoreDriver driver) { + super(driver); + } + + @Override + public GetRouterRegistrationResponse getRouterRegistration( + GetRouterRegistrationRequest request) throws IOException { + + final RouterState partial = RouterState.newInstance(); + partial.setAddress(request.getRouterId()); + final Query query = new Query(partial); + RouterState record = getDriver().get(getRecordClass(), query); + if (record != null) { + overrideExpiredRecord(record); + } + GetRouterRegistrationResponse response = + GetRouterRegistrationResponse.newInstance(); + response.setRouter(record); + return response; + } + + @Override + public GetRouterRegistrationsResponse getRouterRegistrations( + GetRouterRegistrationsRequest request) throws IOException { + + // Get all values from the cache + QueryResult recordsAndTimeStamp = + getCachedRecordsAndTimeStamp(); + List records = recordsAndTimeStamp.getRecords(); + long timestamp = recordsAndTimeStamp.getTimestamp(); + + // Generate response + GetRouterRegistrationsResponse response = + GetRouterRegistrationsResponse.newInstance(); + response.setRouters(records); + response.setTimestamp(timestamp); + return response; + } + + @Override + public RouterHeartbeatResponse routerHeartbeat(RouterHeartbeatRequest request) + throws IOException { + + RouterState record = request.getRouter(); + boolean status = getDriver().put(record, true, false); + RouterHeartbeatResponse response = + RouterHeartbeatResponse.newInstance(status); + return response; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationRequest.java new file mode 100644 index 00000000000..9ba5788da57 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationRequest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API request for retrieving a single router registration present in the state + * store. + */ +public abstract class GetRouterRegistrationRequest { + + public static GetRouterRegistrationRequest newInstance() { + return StateStoreSerializer.newRecord(GetRouterRegistrationRequest.class); + } + + public static GetRouterRegistrationRequest newInstance(String routerId) { + GetRouterRegistrationRequest request = newInstance(); + request.setRouterId(routerId); + return request; + } + + @Public + @Unstable + public abstract String getRouterId(); + + @Public + @Unstable + public abstract void setRouterId(String routerId); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationResponse.java new file mode 100644 index 00000000000..b5693766536 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationResponse.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; + +/** + * API response for retrieving a single router registration present in the state + * store. + */ +public abstract class GetRouterRegistrationResponse { + + public static GetRouterRegistrationResponse newInstance() throws IOException { + return StateStoreSerializer.newRecord(GetRouterRegistrationResponse.class); + } + + @Public + @Unstable + public abstract RouterState getRouter() throws IOException; + + @Public + @Unstable + public abstract void setRouter(RouterState router) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationsRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationsRequest.java new file mode 100644 index 00000000000..b70cccf593a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationsRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API request for retrieving a all non-expired router registrations present in + * the state store. + */ +public abstract class GetRouterRegistrationsRequest { + + public static GetRouterRegistrationsRequest newInstance() throws IOException { + return StateStoreSerializer.newRecord(GetRouterRegistrationsRequest.class); + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationsResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationsResponse.java new file mode 100644 index 00000000000..94e35c6d4cf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetRouterRegistrationsResponse.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; + +/** + * API response for retrieving a all non-expired router registrations present in + * the state store. + */ +public abstract class GetRouterRegistrationsResponse { + + public static GetRouterRegistrationsResponse newInstance() + throws IOException { + return StateStoreSerializer.newRecord(GetRouterRegistrationsResponse.class); + } + + @Public + @Unstable + public abstract List getRouters() throws IOException; + + @Public + @Unstable + public abstract void setRouters(List routers) + throws IOException; + + @Public + @Unstable + public abstract long getTimestamp(); + + @Public + @Unstable + public abstract void setTimestamp(long time); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RouterHeartbeatRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RouterHeartbeatRequest.java new file mode 100644 index 00000000000..97d637bdcb1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RouterHeartbeatRequest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; + +/** + * API request for registering a router with the state store. + */ +public abstract class RouterHeartbeatRequest { + + public static RouterHeartbeatRequest newInstance() throws IOException { + return StateStoreSerializer.newRecord(RouterHeartbeatRequest.class); + } + + public static RouterHeartbeatRequest newInstance(RouterState router) + throws IOException { + RouterHeartbeatRequest request = newInstance(); + request.setRouter(router); + return request; + } + + @Public + @Unstable + public abstract RouterState getRouter() throws IOException; + + @Public + @Unstable + public abstract void setRouter(RouterState routerState); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RouterHeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RouterHeartbeatResponse.java new file mode 100644 index 00000000000..5912e9fa5ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RouterHeartbeatResponse.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for registering a router with the state store. + */ +public abstract class RouterHeartbeatResponse { + + public static RouterHeartbeatResponse newInstance() throws IOException { + return StateStoreSerializer.newRecord(RouterHeartbeatResponse.class); + } + + public static RouterHeartbeatResponse newInstance(boolean status) + throws IOException { + RouterHeartbeatResponse response = newInstance(); + response.setStatus(status); + return response; + } + + @Public + @Unstable + public abstract boolean getStatus(); + + @Public + @Unstable + public abstract void setStatus(boolean result); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationRequestPBImpl.java new file mode 100644 index 00000000000..19bf4ec9a0e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationRequestPBImpl.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationRequestProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * GetRouterRegistrationRequest. + */ +public class GetRouterRegistrationRequestPBImpl + extends GetRouterRegistrationRequest implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + GetRouterRegistrationRequestProto.class); + + public GetRouterRegistrationRequestPBImpl() { + } + + @Override + public GetRouterRegistrationRequestProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public String getRouterId() { + return this.translator.getProtoOrBuilder().getRouterId(); + } + + @Override + public void setRouterId(String routerId) { + this.translator.getBuilder().setRouterId(routerId); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationResponsePBImpl.java new file mode 100644 index 00000000000..ae17e9215b4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationResponsePBImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationResponseProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.RouterStatePBImpl; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * GetRouterRegistrationResponse. + */ +public class GetRouterRegistrationResponsePBImpl + extends GetRouterRegistrationResponse implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator< + GetRouterRegistrationResponseProto, Builder, + GetRouterRegistrationResponseProtoOrBuilder>( + GetRouterRegistrationResponseProto.class); + + public GetRouterRegistrationResponsePBImpl() { + } + + @Override + public GetRouterRegistrationResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message protocol) { + this.translator.setProto(protocol); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public RouterState getRouter() { + RouterRecordProto proto = this.translator.getProtoOrBuilder().getRouter(); + return new RouterStatePBImpl(proto); + } + + @Override + public void setRouter(RouterState router) { + if (router instanceof RouterStatePBImpl) { + RouterStatePBImpl routerPB = (RouterStatePBImpl)router; + RouterRecordProto routerProto = routerPB.getProto(); + this.translator.getBuilder().setRouter(routerProto); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationsRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationsRequestPBImpl.java new file mode 100644 index 00000000000..4b486827fdf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationsRequestPBImpl.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsRequestProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * GetRouterRegistrationsRequest. + */ +public class GetRouterRegistrationsRequestPBImpl + extends GetRouterRegistrationsRequest implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator< + GetRouterRegistrationsRequestProto, Builder, + GetRouterRegistrationsRequestProtoOrBuilder>( + GetRouterRegistrationsRequestProto.class); + + public GetRouterRegistrationsRequestPBImpl() { + } + + @Override + public GetRouterRegistrationsRequestProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationsResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationsResponsePBImpl.java new file mode 100644 index 00000000000..2d597fb44cb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetRouterRegistrationsResponsePBImpl.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsResponseProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.RouterStatePBImpl; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * GetRouterRegistrationsResponse. + */ +public class GetRouterRegistrationsResponsePBImpl + extends GetRouterRegistrationsResponse implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator< + GetRouterRegistrationsResponseProto, Builder, + GetRouterRegistrationsResponseProtoOrBuilder>( + GetRouterRegistrationsResponseProto.class); + + public GetRouterRegistrationsResponsePBImpl() { + + } + + @Override + public GetRouterRegistrationsResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public List getRouters() throws IOException { + + List ret = new ArrayList(); + List memberships = + this.translator.getProtoOrBuilder().getRoutersList(); + for (RouterRecordProto memberProto : memberships) { + RouterState membership = new RouterStatePBImpl(memberProto); + ret.add(membership); + } + return ret; + } + + @Override + public void setRouters(List records) throws IOException { + + this.translator.getBuilder().clearRouters(); + for (RouterState router : records) { + if (router instanceof RouterStatePBImpl) { + RouterStatePBImpl routerPB = (RouterStatePBImpl) router; + this.translator.getBuilder().addRouters(routerPB.getProto()); + } + } + } + + @Override + public long getTimestamp() { + return this.translator.getProtoOrBuilder().getTimestamp(); + } + + @Override + public void setTimestamp(long time) { + this.translator.getBuilder().setTimestamp(time); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RouterHeartbeatRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RouterHeartbeatRequestPBImpl.java new file mode 100644 index 00000000000..cc2be04ab9d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RouterHeartbeatRequestPBImpl.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatRequestProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.RouterStatePBImpl; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * RouterHeartbeatRequest. + */ +public class RouterHeartbeatRequestPBImpl extends RouterHeartbeatRequest + implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + RouterHeartbeatRequestProto.class); + + public RouterHeartbeatRequestPBImpl() { + } + + @Override + public RouterHeartbeatRequestProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public RouterState getRouter() throws IOException { + RouterRecordProto routerProto = + this.translator.getProtoOrBuilder().getRouter(); + return new RouterStatePBImpl(routerProto); + } + + @Override + public void setRouter(RouterState routerState) { + if (routerState instanceof RouterStatePBImpl) { + RouterStatePBImpl routerStatePB = (RouterStatePBImpl)routerState; + this.translator.getBuilder().setRouter(routerStatePB.getProto()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RouterHeartbeatResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RouterHeartbeatResponsePBImpl.java new file mode 100644 index 00000000000..ac534fd49df --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RouterHeartbeatResponsePBImpl.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatResponseProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * RouterHeartbeatResponse. + */ +public class RouterHeartbeatResponsePBImpl extends RouterHeartbeatResponse + implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + RouterHeartbeatResponseProto.class); + + public RouterHeartbeatResponsePBImpl() { + } + + @Override + public RouterHeartbeatResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public boolean getStatus() { + return this.translator.getProtoOrBuilder().getStatus(); + } + + @Override + public void setStatus(boolean result) { + this.translator.getBuilder().setStatus(result); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java new file mode 100644 index 00000000000..ccdd392681a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import java.io.IOException; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Entry to log the state of a + * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} in the + * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService + * FederationStateStoreService}. + */ +public abstract class RouterState extends BaseRecord { + + private static final Logger LOG = LoggerFactory.getLogger(RouterState.class); + + /** Expiration time in ms for this entry. */ + private static long expirationMs; + + /** + * Constructors. + */ + public RouterState() { + super(); + } + + public static RouterState newInstance() { + RouterState record = StateStoreSerializer.newRecord(RouterState.class); + record.init(); + return record; + } + + public static RouterState newInstance(String addr, long startTime, + RouterServiceState status) { + RouterState record = newInstance(); + record.setDateStarted(startTime); + record.setAddress(addr); + record.setStatus(status); + record.setCompileInfo(FederationUtil.getCompileInfo()); + record.setBuildVersion(FederationUtil.getBuildVersion()); + return record; + } + + public abstract void setAddress(String address); + + public abstract void setDateStarted(long dateStarted); + + public abstract String getAddress(); + + public abstract StateStoreVersion getStateStoreVersion() throws IOException; + + public abstract void setStateStoreVersion(StateStoreVersion version); + + public abstract RouterServiceState getStatus(); + + public abstract void setStatus(RouterServiceState newStatus); + + public abstract String getBuildVersion(); + + public abstract void setBuildVersion(String version); + + public abstract String getCompileInfo(); + + public abstract void setCompileInfo(String info); + + public abstract long getDateStarted(); + + /** + * Get the identifier for the Router. It uses the address. + * + * @return Identifier for the Router. + */ + public String getRouterId() { + return getAddress(); + } + + @Override + public boolean like(BaseRecord o) { + if (o instanceof RouterState) { + RouterState other = (RouterState)o; + if (getAddress() != null && + !getAddress().equals(other.getAddress())) { + return false; + } + if (getStatus() != null && + !getStatus().equals(other.getStatus())) { + return false; + } + return true; + } + return false; + } + + @Override + public String toString() { + return getAddress() + " -> " + getStatus() + "," + getBuildVersion(); + } + + @Override + public SortedMap getPrimaryKeys() { + SortedMap map = new TreeMap<>(); + map.put("address", getAddress()); + return map; + } + + @Override + public boolean validate() { + boolean ret = super.validate(); + if ((getAddress() == null || getAddress().length() == 0) && + getStatus() != RouterServiceState.INITIALIZING) { + LOG.error("Invalid router entry, no address specified {}", this); + ret = false; + } + return ret; + } + + @Override + public int compareTo(BaseRecord other) { + if (other == null) { + return -1; + } else if (other instanceof RouterState) { + RouterState router = (RouterState) other; + return this.getAddress().compareTo(router.getAddress()); + } else { + return super.compareTo(other); + } + } + + @Override + public boolean checkExpired(long currentTime) { + if (super.checkExpired(currentTime)) { + setStatus(RouterServiceState.EXPIRED); + return true; + } + return false; + } + + @Override + public long getExpirationMs() { + return RouterState.expirationMs; + } + + public static void setExpirationMs(long time) { + RouterState.expirationMs = time; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/StateStoreVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/StateStoreVersion.java new file mode 100644 index 00000000000..ce86eb0600c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/StateStoreVersion.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * Entry to track the version of the State Store data stored in the State Store + * by a Router. + */ +public abstract class StateStoreVersion extends BaseRecord { + + public static StateStoreVersion newInstance() { + return StateStoreSerializer.newRecord(StateStoreVersion.class); + } + + public static StateStoreVersion newInstance(long membershipVersion, + long mountTableVersion) { + StateStoreVersion record = newInstance(); + record.setMembershipVersion(membershipVersion); + record.setMountTableVersion(mountTableVersion); + return record; + } + + public abstract long getMembershipVersion(); + + public abstract void setMembershipVersion(long version); + + public abstract long getMountTableVersion(); + + public abstract void setMountTableVersion(long version); + + @Override + public SortedMap getPrimaryKeys() { + // This record is not stored directly, no key needed + SortedMap map = new TreeMap(); + return map; + } + + @Override + public long getExpirationMs() { + // This record is not stored directly, no expiration needed + return -1; + } + + @Override + public void setDateModified(long time) { + // We don't store this record directly + } + + @Override + public long getDateModified() { + // We don't store this record directly + return 0; + } + + @Override + public void setDateCreated(long time) { + // We don't store this record directly + } + + @Override + public long getDateCreated() { + // We don't store this record directly + return 0; + } + + @Override + public String toString() { + return "Membership: " + getMembershipVersion() + + " Mount Table: " + getMountTableVersion(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java new file mode 100644 index 00000000000..aebc7a81f5b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProtoOrBuilder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProto; +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the RouterState record. + */ +public class RouterStatePBImpl extends RouterState implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator(RouterRecordProto.class); + + public RouterStatePBImpl() { + } + + public RouterStatePBImpl(RouterRecordProto proto) { + this.translator.setProto(proto); + } + + @Override + public RouterRecordProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public void setAddress(String address) { + RouterRecordProto.Builder builder = this.translator.getBuilder(); + if (address == null) { + builder.clearAddress(); + } else { + builder.setAddress(address); + } + } + + @Override + public String getAddress() { + RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasAddress()) { + return null; + } + return proto.getAddress(); + } + + @Override + public void setStateStoreVersion(StateStoreVersion version) { + RouterRecordProto.Builder builder = this.translator.getBuilder(); + if (version instanceof StateStoreVersionPBImpl) { + StateStoreVersionPBImpl versionPB = (StateStoreVersionPBImpl)version; + StateStoreVersionRecordProto versionProto = + (StateStoreVersionRecordProto)versionPB.getProto(); + builder.setStateStoreVersion(versionProto); + } else { + builder.clearStateStoreVersion(); + } + } + + @Override + public StateStoreVersion getStateStoreVersion() throws IOException { + RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasStateStoreVersion()) { + return null; + } + StateStoreVersionRecordProto versionProto = proto.getStateStoreVersion(); + StateStoreVersion version = + StateStoreSerializer.newRecord(StateStoreVersion.class); + if (version instanceof StateStoreVersionPBImpl) { + StateStoreVersionPBImpl versionPB = (StateStoreVersionPBImpl)version; + versionPB.setProto(versionProto); + return versionPB; + } else { + throw new IOException("Cannot get State Store version"); + } + } + + @Override + public RouterServiceState getStatus() { + RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasStatus()) { + return null; + } + return RouterServiceState.valueOf(proto.getStatus()); + } + + @Override + public void setStatus(RouterServiceState newStatus) { + RouterRecordProto.Builder builder = this.translator.getBuilder(); + if (newStatus == null) { + builder.clearStatus(); + } else { + builder.setStatus(newStatus.toString()); + } + } + + @Override + public String getBuildVersion() { + RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasBuildVersion()) { + return null; + } + return proto.getBuildVersion(); + } + + @Override + public void setBuildVersion(String version) { + RouterRecordProto.Builder builder = this.translator.getBuilder(); + if (version == null) { + builder.clearBuildVersion(); + } else { + builder.setBuildVersion(version); + } + } + + @Override + public String getCompileInfo() { + RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasCompileInfo()) { + return null; + } + return proto.getCompileInfo(); + } + + @Override + public void setCompileInfo(String info) { + RouterRecordProto.Builder builder = this.translator.getBuilder(); + if (info == null) { + builder.clearCompileInfo(); + } else { + builder.setCompileInfo(info); + } + } + + @Override + public void setDateStarted(long dateStarted) { + this.translator.getBuilder().setDateStarted(dateStarted); + } + + @Override + public long getDateStarted() { + return this.translator.getProtoOrBuilder().getDateStarted(); + } + + @Override + public void setDateModified(long time) { + this.translator.getBuilder().setDateModified(time); + } + + @Override + public long getDateModified() { + return this.translator.getProtoOrBuilder().getDateModified(); + } + + @Override + public void setDateCreated(long time) { + this.translator.getBuilder().setDateCreated(time); + } + + @Override + public long getDateCreated() { + return this.translator.getProtoOrBuilder().getDateCreated(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/StateStoreVersionPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/StateStoreVersionPBImpl.java new file mode 100644 index 00000000000..7696136ec78 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/StateStoreVersionPBImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the StateStoreVersion record. + */ +public class StateStoreVersionPBImpl extends StateStoreVersion + implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + StateStoreVersionRecordProto.class); + + public StateStoreVersionPBImpl() { + } + + @Override + public StateStoreVersionRecordProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public long getMembershipVersion() { + return this.translator.getProtoOrBuilder().getMembershipVersion(); + } + + @Override + public void setMembershipVersion(long version) { + this.translator.getBuilder().setMembershipVersion(version); + } + + @Override + public long getMountTableVersion() { + return this.translator.getProtoOrBuilder().getMountTableVersion(); + } + + @Override + public void setMountTableVersion(long version) { + this.translator.getBuilder().setMountTableVersion(version); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto index 88acd087133..42ac357254c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto @@ -168,3 +168,47 @@ message GetMountTableEntriesResponseProto { optional uint64 timestamp = 2; } + +///////////////////////////////////////////////// +// Routers +///////////////////////////////////////////////// + +message StateStoreVersionRecordProto { + optional uint64 membershipVersion = 1; + optional uint64 mountTableVersion = 2; +} + +message RouterRecordProto { + optional uint64 dateCreated = 1; + optional uint64 dateModified = 2; + optional string address = 3; + optional string status = 4; + optional StateStoreVersionRecordProto stateStoreVersion = 5; + optional string buildVersion = 6; + optional string compileInfo = 7; + optional uint64 dateStarted = 8; +} + +message GetRouterRegistrationRequestProto { + optional string routerId = 1; +} + +message GetRouterRegistrationResponseProto { + optional RouterRecordProto router = 1; +} + +message GetRouterRegistrationsRequestProto { +} + +message GetRouterRegistrationsResponseProto { + optional uint64 timestamp = 1; + repeated RouterRecordProto routers = 2; +} + +message RouterHeartbeatRequestProto { + optional RouterRecordProto router = 1; +} + +message RouterHeartbeatResponseProto { + optional bool status = 1; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 01fe1499943..1091c21e587 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -37,6 +37,7 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; @@ -44,6 +45,8 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,6 +97,10 @@ public class TestStateStoreDriverBase { return randomString; } + private long generateRandomLong() { + return RANDOM.nextLong(); + } + @SuppressWarnings("rawtypes") private T generateRandomEnum(Class enumClass) { int x = RANDOM.nextInt(enumClass.getEnumConstants().length); @@ -117,6 +124,12 @@ public class TestStateStoreDriverBase { Map destMap = Collections.singletonMap( generateRandomString(), "/" + generateRandomString()); return (T) MountTable.newInstance(src, destMap); + } else if (recordClass == RouterState.class) { + RouterState routerState = RouterState.newInstance(generateRandomString(), + generateRandomLong(), generateRandomEnum(RouterServiceState.class)); + StateStoreVersion version = generateFakeRecord(StateStoreVersion.class); + routerState.setStateStoreVersion(version); + return (T) routerState; } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java new file mode 100644 index 00000000000..76e0b962368 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.junit.Test; + +/** + * Test the Router State records. + */ +public class TestRouterState { + + private static final String ADDRESS = "address"; + private static final String BUILD_VERSION = "buildVersion"; + private static final String COMPILE_INFO = "compileInfo"; + private static final long START_TIME = 100; + private static final long DATE_MODIFIED = 200; + private static final long DATE_CREATED = 300; + private static final long FILE_RESOLVER_VERSION = 500; + private static final RouterServiceState STATE = RouterServiceState.RUNNING; + + + private RouterState generateRecord() throws IOException { + RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE); + record.setBuildVersion(BUILD_VERSION); + record.setCompileInfo(COMPILE_INFO); + record.setDateCreated(DATE_CREATED); + record.setDateModified(DATE_MODIFIED); + + StateStoreVersion version = StateStoreVersion.newInstance(); + version.setMountTableVersion(FILE_RESOLVER_VERSION); + record.setStateStoreVersion(version); + return record; + } + + private void validateRecord(RouterState record) throws IOException { + assertEquals(ADDRESS, record.getAddress()); + assertEquals(START_TIME, record.getDateStarted()); + assertEquals(STATE, record.getStatus()); + assertEquals(COMPILE_INFO, record.getCompileInfo()); + assertEquals(BUILD_VERSION, record.getBuildVersion()); + + StateStoreVersion version = record.getStateStoreVersion(); + assertEquals(FILE_RESOLVER_VERSION, version.getMountTableVersion()); + } + + @Test + public void testGetterSetter() throws IOException { + RouterState record = generateRecord(); + validateRecord(record); + } + + @Test + public void testSerialization() throws IOException { + + RouterState record = generateRecord(); + + StateStoreSerializer serializer = StateStoreSerializer.getSerializer(); + String serializedString = serializer.serializeString(record); + RouterState newRecord = + serializer.deserialize(serializedString, RouterState.class); + + validateRecord(newRecord); + } +}