In this tutorial, we will get to see more examples of ORM querying.
First, we will gather a lot of data. In order to achieve that we will write a randomized experiment for grasping a couple of objects. In the experiment the robot will try to grasp a randomized object using random poses and torso heights.
from tf import transformations
import itertools
import time
from typing import Optional, List, Tuple
import numpy as np
import sqlalchemy.orm
import tf
import tqdm
import pycram.orm.base
import pycram.task
from pycram.bullet_world import BulletWorld, Object as BulletWorldObject
from pycram.designators.action_designator import MoveTorsoAction, PickUpAction, NavigateAction, ParkArmsAction
from pycram.designators.object_designator import ObjectDesignatorDescription
import pycram.enums
from pycram.plan_failures import PlanFailure
from pycram.process_module import ProcessModule
from pycram.process_module import simulated_robot
import sqlalchemy.orm
# from pycram.resolver.location.jpt_location import JPTCostmapLocation
import pycram.orm
from pycram.orm.base import Position, RobotState
from pycram.orm.task import TaskTreeNode, Code
from pycram.orm.action_designator import PickUpAction as ORMPickUpAction
from pycram.orm.object_designator import Object
import sqlalchemy.sql
import pandas as pd
from pycram.pose import Pose
np.random.seed(420)
ProcessModule.execution_delay = False
pycram.orm.base.ProcessMetaData().description = "Tutorial for learning from experience in a Grasping action."
class GraspingExplorer:
"""Class to try randomized grasping plans."""
world: Optional[BulletWorld]
def __init__(self, robots: Optional[List[Tuple[str, str]]] = None, objects: Optional[List[Tuple[str, str]]] = None,
arms: Optional[List[str]] = None, grasps: Optional[List[str]] = None,
samples_per_scenario: int = 1000):
"""
Create a GraspingExplorer.
:param robots: The robots to use
:param objects: The objects to try to grasp
:param arms: The arms of the robot to use
:param grasps: The grasp orientations to use
:param samples_per_scenario: The number of tries per scenario.
"""
# store exploration space
if not robots:
self.robots: List[Tuple[str, str]] = [("pr2", "pr2.urdf")]
if not objects:
self.objects: List[Tuple[str, pycram.enums.ObjectType, str]] = [("cereal", pycram.enums.ObjectType.BREAKFAST_CEREAL, "breakfast_cereal.stl"),
("bowl", pycram.enums.ObjectType.BOWL, "bowl.stl"),
("milk", pycram.enums.ObjectType.MILK, "milk.stl"),
("spoon", pycram.enums.ObjectType.SPOON, "spoon.stl")]
if not arms:
self.arms: List[str] = ["left", "right"]
if not grasps:
self.grasps: List[str] = ["left", "right", "front", "top"]
# store trials per scenario
self.samples_per_scenario: int = samples_per_scenario
# chain hyperparameters
self.hyper_parameters = [self.robots, self.objects, self.arms, self.grasps]
self.total_tries = 0
self.total_failures = 0
def perform(self, session: sqlalchemy.orm.Session):
"""
Perform all experiments.
:param session: The database-session to insert the samples in.
"""
# create progress bar
progress_bar = tqdm.tqdm(
total=np.prod([len(p) for p in self.hyper_parameters]) * self.samples_per_scenario)
self.world = BulletWorld("DIRECT")
# for every robot
for robot, robot_urdf in self.robots:
# spawn it
robot = BulletWorldObject(robot, pycram.enums.ObjectType.ROBOT, robot_urdf)
# for every obj
for obj, obj_type, obj_stl in self.objects:
# spawn it
bw_object = BulletWorldObject(obj, obj_type, obj_stl, Pose([0, 0, 0.75], [0, 0, 0, 1]))
# create object designator
object_designator = ObjectDesignatorDescription(names=[obj])
# for every arm and grasp pose
for arm, grasp in itertools.product(self.arms, self.grasps):
# sample positions in 2D
positions = np.random.uniform([-2, -2], [2, 2], (self.samples_per_scenario, 2))
# for every position
for position in positions:
# set z axis to 0
position = [*position, 0]
# calculate orientation for robot to face the object
angle = np.arctan2(position[1], position[0]) + np.pi
orientation = list(transformations.quaternion_from_euler(0, 0, angle, axes="sxyz"))
# try to execute a grasping plan
with simulated_robot:
ParkArmsAction.Action(pycram.enums.Arms.BOTH).perform()
# navigate to sampled position
NavigateAction([Pose(position, orientation)]).resolve().perform()
# move torso
height = np.random.uniform(0., 0.33, 1)[0]
MoveTorsoAction.Action(height).perform()
# try to pick it up
try:
PickUpAction(object_designator, [arm], [grasp]).resolve().perform()
# if it fails
except PlanFailure:
# update failure stats
self.total_failures += 1
# reset BulletWorld
self.world.reset_bullet_world()
# update progress bar
self.total_tries += 1
# insert into database
pycram.task.task_tree.insert(session, use_progress_bar=False)
pycram.task.reset_tree()
progress_bar.update()
progress_bar.set_postfix(success_rate=(self.total_tries - self.total_failures) /
self.total_tries)
bw_object.remove()
robot.remove()
pybullet build time: May 20 2022 19:44:17 Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link'] [WARN] [1702467193.499071]: Could not import RoboKudo messages, RoboKudo interface could not be initialized [WARN] [1702467193.504366]: Failed to import Giskard messages
Next we have to establish a connection to a database and execute the experiment a couple of times. Note that the (few) number of samples we generate is only for demonstrations. For robust and reliable machine learning millions of samples are required.
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:")
session = sqlalchemy.orm.Session(bind=engine)
pycram.orm.base.Base.metadata.create_all(bind=engine)
session.commit()
explorer = GraspingExplorer(samples_per_scenario=30)
explorer.perform(session)
0%| | 0/960 [00:00<?, ?it/s]Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1] Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link'] Unknown tag "material" in /robot[@name='plane']/link[@name='planeLink']/collision[1] Unknown tag "contact" in /robot[@name='plane']/link[@name='planeLink'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='base_laser_link'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame'] Unknown attribute "type" in /robot[@name='pr2']/link[@name='laser_tilt_link'] 100%|██████████| 960/960 [01:41<00:00, 9.87it/s, success_rate=0.0854]
The success_rate of the output above indicates how many of our samples succeeded in trying to grasp a randomized object.
Now that we have data to query from and a running session, we can actually start creating queries. Let's say we want to select positions of robots that were able to grasp a specific object (in this case a "milk" object):
from pycram.enums import ObjectType
milk = BulletWorldObject("Milk", ObjectType.MILK, "milk.stl")
# query all relative robot positions in regard to an objects position
# make sure to order the joins() correctly
query = (session.query(ORMPickUpAction.arm, ORMPickUpAction.grasp, RobotState.torso_height, Position.x, Position.y)
.join(TaskTreeNode.code)
.join(Code.designator.of_type(ORMPickUpAction))
.join(ORMPickUpAction.robot_state)
.join(RobotState.pose)
.join(pycram.orm.base.Pose.position)
.join(ORMPickUpAction.object).filter(Object.type == milk.type)
.filter(TaskTreeNode.status == "SUCCEEDED"))
print(query)
df = pd.read_sql_query(query.statement, session.get_bind())
print(df)
SELECT "PickUpAction".arm AS "PickUpAction_arm", "PickUpAction".grasp AS "PickUpAction_grasp", "RobotState".torso_height AS "RobotState_torso_height", "Position".x AS "Position_x", "Position".y AS "Position_y" FROM "TaskTreeNode" JOIN "Code" ON "Code".id = "TaskTreeNode".code_id JOIN ("Designator" JOIN "Action" ON "Designator".id = "Action".id JOIN "PickUpAction" ON "Action".id = "PickUpAction".id) ON "Designator".id = "Code".designator_id JOIN "RobotState" ON "RobotState".id = "Action".robot_state_id JOIN "Pose" ON "Pose".id = "RobotState".pose_id JOIN "Position" ON "Position".id = "Pose".position_id JOIN "Object" ON "Object".id = "PickUpAction".object_id WHERE "Object".type = ? AND "TaskTreeNode".status = ? arm grasp torso_height x y 0 left left 0.198541 -0.679778 0.049746 1 left left 0.092868 0.473199 0.612112 2 left left 0.310885 0.202801 0.524949 3 left right 0.272878 0.012431 -0.405403 4 left right 0.275279 -0.438336 -0.327630 5 left front 0.192652 -0.252096 -0.630014 6 left front 0.308247 -0.302412 0.594757 7 left front 0.042853 -0.819815 -0.352528 8 left top 0.048944 0.277142 0.467083 9 left top 0.279301 -0.186137 -0.646486 10 right left 0.185506 0.348462 0.626915 11 right left 0.147901 0.619579 0.430394 12 right left 0.079778 0.289603 -0.414900 13 right left 0.187063 0.350363 -0.378484 14 right left 0.170081 -0.054975 0.764847 15 right right 0.098215 0.126765 -0.791066 16 right front 0.269773 0.237709 0.387341 17 right front 0.018371 0.217048 0.558395 18 right top 0.281445 -0.459514 0.528471 19 right top 0.051581 -0.164640 -0.629421
If you are not familiar with sqlalchemy querying you might wonder what the of_type() function does and why we needed it in this query:
In order to understand the importance of the of_type() function in the joins above it is crucial to understand the inheritance structure in the ORM package. The action necessary for this query is the PickUpAction. It inherits the Action class/table (which holds all the actions). The Action class itself on the other hand inherits Designator (which holds all the actions, but also all the motions). We started our joins by joining TaskTreeNode on its relationship to Code and Code on its relationship to Designator. Next table we need is the PickUpAction table, but there is no specified relationship between Designator and PickUpAction. But we do know that a PickUpAction is actually a Designator, meaning, it inherits from Designator. So we can just "tell" the join to join Code on every Designator, that is "of_type" PickUpAction (.join(Code.designator.of_type(ORMPickUpAction))). The effect of this function can also be seen in the printed query of above's output.
Another interesting query: Let's say we want to select the torso height and positions of robots relative to the object they were trying to grasp:
from pycram.orm.base import Pose as ORMPose
robot_pose = sqlalchemy.orm.aliased(ORMPose)
object_pose = sqlalchemy.orm.aliased(ORMPose)
robot_position = sqlalchemy.orm.aliased(Position)
object_position = sqlalchemy.orm.aliased(Position)
query = (session.query(TaskTreeNode.status, Object.type,
sqlalchemy.label("relative torso height", object_position.z - RobotState.torso_height),
sqlalchemy.label("x", robot_position.x - object_position.x),
sqlalchemy.label("y", robot_position.y - object_position.y))
.join(TaskTreeNode.code)
.join(Code.designator.of_type(ORMPickUpAction))
.join(ORMPickUpAction.robot_state)
.join(robot_pose, RobotState.pose)
.join(robot_position, robot_pose.position)
.join(ORMPickUpAction.object)
.join(object_pose, Object.pose)
.join(object_position, object_pose.position))
print(query)
df = pd.read_sql(query.statement, session.get_bind())
df["status"] = df["status"].apply(lambda x: str(x.name))
print(df)
SELECT "TaskTreeNode".status AS "TaskTreeNode_status", "Object".type AS "Object_type", "Position_1".z - "RobotState".torso_height AS "relative torso height", "Position_2".x - "Position_1".x AS x, "Position_2".y - "Position_1".y AS y FROM "TaskTreeNode" JOIN "Code" ON "Code".id = "TaskTreeNode".code_id JOIN ("Designator" JOIN "Action" ON "Designator".id = "Action".id JOIN "PickUpAction" ON "Action".id = "PickUpAction".id) ON "Designator".id = "Code".designator_id JOIN "RobotState" ON "RobotState".id = "Action".robot_state_id JOIN "Pose" AS "Pose_1" ON "Pose_1".id = "RobotState".pose_id JOIN "Position" AS "Position_2" ON "Position_2".id = "Pose_1".position_id JOIN "Object" ON "Object".id = "PickUpAction".object_id JOIN "Pose" AS "Pose_2" ON "Pose_2".id = "Object".pose_id JOIN "Position" AS "Position_1" ON "Position_1".id = "Pose_2".position_id status type relative torso height x \ 0 FAILED ObjectType.BREAKFAST_CEREAL 0.480005 -0.737416 1 FAILED ObjectType.BREAKFAST_CEREAL 0.668903 -0.932071 2 FAILED ObjectType.BREAKFAST_CEREAL 0.724878 1.472666 3 FAILED ObjectType.BREAKFAST_CEREAL 0.553403 -0.589925 4 SUCCEEDED ObjectType.BREAKFAST_CEREAL 0.690212 0.505402 .. ... ... ... ... 955 FAILED ObjectType.SPOON 0.546126 -1.220945 956 FAILED ObjectType.SPOON 0.664514 1.547123 957 FAILED ObjectType.SPOON 0.554894 1.256923 958 FAILED ObjectType.SPOON 0.493282 0.887036 959 FAILED ObjectType.SPOON 0.581754 -1.087740 y 0 -0.187877 1 -1.564287 2 0.518914 3 -1.729850 4 0.394643 .. ... 955 0.039673 956 1.356915 957 -0.207460 958 -0.605063 959 1.048258 [960 rows x 5 columns]
Obviously the query returned every row of the database since we didn't apply any filters.
Why is this query interesting? This query not only required more joins and the usage of the of_type() function, but we actually needed to access two of the tables twice with different purposes, namely the Pose and Position tables. We wanted to get the position of the robot relative to the object position, meaning we had to obtain all robot positions and all object positions. If we want to access the same table twice, we have to make sure to rename (one of) the occurrences in our query in order to provide proper sql syntax. This can be done by creating aliases using the sqlalchemy.orm.aliased() function. Sqlalchemy will automatically rename all the aliased tables for you during runtime.