Source code for machine_common_sense.reward

from typing import Dict, List, Optional

import typeguard
from shapely import geometry

from .goal_metadata import GoalCategory, GoalMetadata

GOAL_ACHIEVED = 1
GOAL_NOT_ACHIEVED = 0
STEP_PENALTY = 0.001
LAVA_PENALTY = 100.0


[docs]class Reward(object): '''Reward utility class''' @staticmethod def __get_object_from_list(objects: List[Dict], target_id: str) -> Dict: ''' Finds an object in a list. Uses a generator to return the first item or defaults to None if the target isn't found. Args: objects: list of object dictionaries target_id: str objectId of the object to find Returns: target: object dictionary if found or None ''' return next((o for o in objects if o['objectId'] == target_id), None) @staticmethod def _convert_object_to_planar_polygon( goal_object: Dict) -> geometry.Polygon: ''' Project goal object bounds (x,y,z) to an XZ planar polygon. Args: goal_object: dict Returns: polygons: shapely.geometry.Polygon ''' bbox3d = goal_object['objectBounds']['objectBoundsCorners'] # project to XZ plane xz_pts = [(pt['x'], pt['z']) for pt in bbox3d] return geometry.MultiPoint(xz_pts).convex_hull @staticmethod def _calc_retrieval_reward( goal: GoalMetadata, objects: Dict, agent: Dict, performer_reach: float, goal_reward: float = GOAL_ACHIEVED) -> float: ''' Calculate the reward for the retrieval goal. The goal object must be in the agent's hand. Args: goal: GoalMetadata objects: Dict agent: Dict performer_reach: float goal_reward: float Returns: float: 1 for goal achieved and no goal_reward arg is given, goal_reward value if goal_reward arg is given, 0 otherwise. ''' reward = GOAL_NOT_ACHIEVED goal_objects = [] metadata = goal.metadata or {} # Different goal categories may use different property names target_names = ['target', 'targets'] # Get the total number of targets. pickup_number = metadata.get('pickup_number') or 0 for target_name in target_names: # Some properties may be dicts, and some may be lists of dicts targets = metadata.get(target_name) or [] targets = targets if isinstance(targets, list) else [targets] # If pickup_number was not defined, use the list's length. if not pickup_number: pickup_number = len(targets) for target in targets: goal_id = target.get('id') goal_object = Reward.__get_object_from_list(objects, goal_id) if goal_object: goal_objects.append(goal_object) picked_up = len([ object_metadata for object_metadata in goal_objects if object_metadata.get('wasPickedUp') ]) # Attain the reward if the required number of targets were picked-up. if goal_objects and pickup_number and picked_up >= pickup_number: reward = goal_reward return round(reward, 4) @staticmethod def _adjust_score_penalty( current_score: int, number_steps: int, steps_on_lava: int, lava_penalty: float = LAVA_PENALTY, step_penalty: float = STEP_PENALTY) -> float: ''' Calculate the score penalty based on the number of steps, if the current step results in a reward being achieved do not penalize them for the step that resulted in the goal being achieved. Args: current_score: 1 or 0 depending if reward achieved number_steps: the current step count steps_on_lava: the number of total steps on lava lava_penalty: the point deduction for each step on lava step_penalty: point deduction for each step Returns: float: new score based off of step penalty ''' step_penalty = STEP_PENALTY if step_penalty is None else step_penalty lava_penalty = LAVA_PENALTY if lava_penalty is None else lava_penalty if current_score == 1: return current_score - ((number_steps - 1) * float(step_penalty)) if steps_on_lava is None: steps_on_lava = 0 return current_score - (number_steps * float(step_penalty)) - \ (steps_on_lava * float(lava_penalty)) @staticmethod def _calculate_default_reward( goal: GoalMetadata, objects: Dict, agent: Dict, reach: float, goal_reward: float = GOAL_ACHIEVED) -> float: '''Returns the default reward of 0; not achieved.''' return GOAL_NOT_ACHIEVED
[docs] @staticmethod @typeguard.typechecked def calculate_reward( goal: Optional[GoalMetadata], objects: List[Dict], agent: Dict, number_steps: int, reach: Optional[float], steps_on_lava: Optional[int] = None, lava_penalty: Optional[float] = LAVA_PENALTY, step_penalty: Optional[float] = STEP_PENALTY, goal_reward: Optional[float] = GOAL_ACHIEVED) -> float: ''' Determine if the agent achieved the goal. Args: goal: GoalMetadata objects: Dict agent: Dict reach: float steps_on_lava: int lava_penalty: float step_penalty: float goal_reward: float Returns: int: reward is 1 if goal achieved, 0 otherwise ''' category = None if goal is not None and goal.metadata: category = goal.metadata.get('category', None) switch = { GoalCategory.IMITATION.value: Reward._calc_retrieval_reward, GoalCategory.RETRIEVAL.value: Reward._calc_retrieval_reward, GoalCategory.MULTI_RETRIEVAL.value: Reward._calc_retrieval_reward } current_score = switch.get(category, Reward._calculate_default_reward)( goal, objects, agent, reach, GOAL_ACHIEVED if goal_reward is None else goal_reward ) return Reward._adjust_score_penalty( current_score, number_steps, steps_on_lava, lava_penalty, step_penalty )