*Planning* combines the two major areas of AI: search and logic. A planner can be seen either as a program that searches for a solution or as one that constructively proves the existence of a solution.
Currently, the most popular and effective approaches to fully automated planning are:
from planning import *
A planning graph is a directed graph organized into levels each of which contains information about the current state of the knowledge base and the possible state-action links to and from that level.
The first level contains the initial state with nodes representing each fluent that holds in that level. This level has state-action links linking each state to valid actions in that state. Each action is linked to all its preconditions and its effect states. Based on these effects, the next level is constructed and contains similarly structured information about the next state. In this way, the graph is expanded using state-action links till we reach a state where all the required goals hold true simultaneously.
In every planning problem, we are allowed to carry out the no-op action, ie, we can choose no action for a particular state. These are called persistence actions and has effects same as its preconditions. This enables us to carry a state to the next level.
Mutual exclusivity (mutex) between two actions means that these cannot be taken together and occurs in the following cases:
We can say that we have reached our goal if none of the goal states in the current level are mutually exclusive.
%psource Graph
class Graph: """ Contains levels of state and actions Used in graph planning algorithm to extract a solution """ def __init__(self, planning_problem): self.planning_problem = planning_problem self.kb = FolKB(planning_problem.initial) self.levels = [Level(self.kb)] self.objects = set(arg for clause in self.kb.clauses for arg in clause.args) def __call__(self): self.expand_graph() def expand_graph(self): """Expands the graph by a level""" last_level = self.levels[-1] last_level(self.planning_problem.actions, self.objects) self.levels.append(last_level.perform_actions()) def non_mutex_goals(self, goals, index): """Checks whether the goals are mutually exclusive""" goal_perm = itertools.combinations(goals, 2) for g in goal_perm: if set(g) in self.levels[index].mutex: return False return True
%psource Level
class Level: """ Contains the state of the planning problem and exhaustive list of actions which use the states as pre-condition. """ def __init__(self, kb): """Initializes variables to hold state and action details of a level""" self.kb = kb # current state self.current_state = kb.clauses # current action to state link self.current_action_links = {} # current state to action link self.current_state_links = {} # current action to next state link self.next_action_links = {} # next state to current action link self.next_state_links = {} # mutually exclusive actions self.mutex = [] def __call__(self, actions, objects): self.build(actions, objects) self.find_mutex() def separate(self, e): """Separates an iterable of elements into positive and negative parts""" positive = [] negative = [] for clause in e: if clause.op[:3] == 'Not': negative.append(clause) else: positive.append(clause) return positive, negative def find_mutex(self): """Finds mutually exclusive actions""" # Inconsistent effects pos_nsl, neg_nsl = self.separate(self.next_state_links) for negeff in neg_nsl: new_negeff = Expr(negeff.op[3:], *negeff.args) for poseff in pos_nsl: if new_negeff == poseff: for a in self.next_state_links[poseff]: for b in self.next_state_links[negeff]: if {a, b} not in self.mutex: self.mutex.append({a, b}) # Interference will be calculated with the last step pos_csl, neg_csl = self.separate(self.current_state_links) # Competing needs for pos_precond in pos_csl: for neg_precond in neg_csl: new_neg_precond = Expr(neg_precond.op[3:], *neg_precond.args) if new_neg_precond == pos_precond: for a in self.current_state_links[pos_precond]: for b in self.current_state_links[neg_precond]: if {a, b} not in self.mutex: self.mutex.append({a, b}) # Inconsistent support state_mutex = [] for pair in self.mutex: next_state_0 = self.next_action_links[list(pair)[0]] if len(pair) == 2: next_state_1 = self.next_action_links[list(pair)[1]] else: next_state_1 = self.next_action_links[list(pair)[0]] if (len(next_state_0) == 1) and (len(next_state_1) == 1): state_mutex.append({next_state_0[0], next_state_1[0]}) self.mutex = self.mutex + state_mutex def build(self, actions, objects): """Populates the lists and dictionaries containing the state action dependencies""" for clause in self.current_state: p_expr = Expr('P' + clause.op, *clause.args) self.current_action_links[p_expr] = [clause] self.next_action_links[p_expr] = [clause] self.current_state_links[clause] = [p_expr] self.next_state_links[clause] = [p_expr] for a in actions: num_args = len(a.args) possible_args = tuple(itertools.permutations(objects, num_args)) for arg in possible_args: if a.check_precond(self.kb, arg): for num, symbol in enumerate(a.args): if not symbol.op.islower(): arg = list(arg) arg[num] = symbol arg = tuple(arg) new_action = a.substitute(Expr(a.name, *a.args), arg) self.current_action_links[new_action] = [] for clause in a.precond: new_clause = a.substitute(clause, arg) self.current_action_links[new_action].append(new_clause) if new_clause in self.current_state_links: self.current_state_links[new_clause].append(new_action) else: self.current_state_links[new_clause] = [new_action] self.next_action_links[new_action] = [] for clause in a.effect: new_clause = a.substitute(clause, arg) self.next_action_links[new_action].append(new_clause) if new_clause in self.next_state_links: self.next_state_links[new_clause].append(new_action) else: self.next_state_links[new_clause] = [new_action] def perform_actions(self): """Performs the necessary actions and returns a new Level""" new_kb = FolKB(list(set(self.next_state_links.keys()))) return Level(new_kb)
A planning graph can be used to give better heuristic estimates which can be applied to any of the search techniques. Alternatively, we can search for a solution over the space formed by the planning graph, using an algorithm called GraphPlan
.
The GraphPlan
algorithm repeatedly adds a level to a planning graph. Once all the goals show up as non-mutex in the graph, the algorithm runs backward from the last level to the first searching for a plan that solves the problem. If that fails, it records the (level , goals) pair as a no-good (as in constraint learning for CSPs), expands another level and tries again, terminating with failure when there is no reason to go on.
%psource GraphPlan
class GraphPlan: """ Class for formulation GraphPlan algorithm Constructs a graph of state and action space Returns solution for the planning problem """ def __init__(self, planning_problem): self.graph = Graph(planning_problem) self.no_goods = [] self.solution = [] def check_leveloff(self): """Checks if the graph has levelled off""" check = (set(self.graph.levels[-1].current_state) == set(self.graph.levels[-2].current_state)) if check: return True def extract_solution(self, goals, index): """Extracts the solution""" level = self.graph.levels[index] if not self.graph.non_mutex_goals(goals, index): self.no_goods.append((level, goals)) return level = self.graph.levels[index - 1] # Create all combinations of actions that satisfy the goal actions = [] for goal in goals: actions.append(level.next_state_links[goal]) all_actions = list(itertools.product(*actions)) # Filter out non-mutex actions non_mutex_actions = [] for action_tuple in all_actions: action_pairs = itertools.combinations(list(set(action_tuple)), 2) non_mutex_actions.append(list(set(action_tuple))) for pair in action_pairs: if set(pair) in level.mutex: non_mutex_actions.pop(-1) break # Recursion for action_list in non_mutex_actions: if [action_list, index] not in self.solution: self.solution.append([action_list, index]) new_goals = [] for act in set(action_list): if act in level.current_action_links: new_goals = new_goals + level.current_action_links[act] if abs(index) + 1 == len(self.graph.levels): return elif (level, new_goals) in self.no_goods: return else: self.extract_solution(new_goals, index - 1) # Level-Order multiple solutions solution = [] for item in self.solution: if item[1] == -1: solution.append([]) solution[-1].append(item[0]) else: solution[-1].append(item[0]) for num, item in enumerate(solution): item.reverse() solution[num] = item return solution def goal_test(self, kb): return all(kb.ask(q) is not False for q in self.graph.planning_problem.goals) def execute(self): """Executes the GraphPlan algorithm for the given problem""" while True: self.graph.expand_graph() if (self.goal_test(self.graph.levels[-1].kb) and self.graph.non_mutex_goals( self.graph.planning_problem.goals, -1)): solution = self.extract_solution(self.graph.planning_problem.goals, -1) if solution: return solution if len(self.graph.levels) >= 2 and self.check_leveloff(): return None
The description of a planning problem defines a search problem: we can search from the initial state through the space of states, looking for a goal. One of the nice advantages of the declarative representation of action schemas is that we can also search backward from the goal, looking for the initial state.
However, neither forward nor backward search is efficient without a good heuristic function because the real-world planning problems often have large state spaces. A heuristic function $h(s)$ estimates the distance from a state $s$ to the goal and, if it is admissible, ie if does not overestimate, then we can use $A^∗$ search to find optimal solutions.
Planning uses a factored representation for states and action schemas which makes it possible to define good domain-independent heuristics to prune the search space.
An admissible heuristic can be derived by defining a relaxed problem that is easier to solve. The length of the solution of this easier problem then becomes the heuristic for the original problem. Assume that all goals and preconditions contain only positive literals, ie that the problem is defined according to the Stanford Research Institute Problem Solver (STRIPS) notation: we want to create a relaxed version of the original problem that will be easier to solve by ignoring delete lists from all actions, ie removing all negative literals from effects. As shown in [[1]](#cite-hoffmann2001ff) the planning graph of a relaxed problem does not contain any mutex relations at all (which is the crucial thing when building a planning graph) and for this reason GraphPlan will never backtrack looking for a solution: for this reason the ignore delete lists heuristic makes it possible to find the optimal solution for relaxed problem in polynomial time through GraphPlan
algorithm.
from search import *
Forward search through the space of states, starting in the initial state and using the problem’s actions to search forward for a member of the set of goal states.
%psource ForwardPlan
class ForwardPlan(search.Problem): """ [Section 10.2.1] Forward state-space search """ def __init__(self, planning_problem): super().__init__(associate('&', planning_problem.initial), associate('&', planning_problem.goals)) self.planning_problem = planning_problem self.expanded_actions = self.planning_problem.expand_actions() def actions(self, state): return [action for action in self.expanded_actions if all(pre in conjuncts(state) for pre in action.precond)] def result(self, state, action): return associate('&', action(conjuncts(state), action.args).clauses) def goal_test(self, state): return all(goal in conjuncts(state) for goal in self.planning_problem.goals) def h(self, state): """ Computes ignore delete lists heuristic by creating a relaxed version of the original problem (we can do that by removing the delete lists from all actions, i.e. removing all negative literals from effects) that will be easier to solve through GraphPlan and where the length of the solution will serve as a good heuristic. """ relaxed_planning_problem = PlanningProblem(initial=state.state, goals=self.goal, actions=[action.relaxed() for action in self.planning_problem.actions]) try: return len(linearize(GraphPlan(relaxed_planning_problem).execute())) except: return float('inf')
Backward search through sets of relevant states, starting at the set of states representing the goal and using the inverse of the actions to search backward for the initial state.
%psource BackwardPlan
class BackwardPlan(search.Problem): """ [Section 10.2.2] Backward relevant-states search """ def __init__(self, planning_problem): super().__init__(associate('&', planning_problem.goals), associate('&', planning_problem.initial)) self.planning_problem = planning_problem self.expanded_actions = self.planning_problem.expand_actions() def actions(self, subgoal): """ Returns True if the action is relevant to the subgoal, i.e.: - the action achieves an element of the effects - the action doesn't delete something that needs to be achieved - the preconditions are consistent with other subgoals that need to be achieved """ def negate_clause(clause): return Expr(clause.op.replace('Not', ''), *clause.args) if clause.op[:3] == 'Not' else Expr( 'Not' + clause.op, *clause.args) subgoal = conjuncts(subgoal) return [action for action in self.expanded_actions if (any(prop in action.effect for prop in subgoal) and not any(negate_clause(prop) in subgoal for prop in action.effect) and not any(negate_clause(prop) in subgoal and negate_clause(prop) not in action.effect for prop in action.precond))] def result(self, subgoal, action): # g' = (g - effects(a)) + preconds(a) return associate('&', set(set(conjuncts(subgoal)).difference(action.effect)).union(action.precond)) def goal_test(self, subgoal): return all(goal in conjuncts(self.goal) for goal in conjuncts(subgoal)) def h(self, subgoal): """ Computes ignore delete lists heuristic by creating a relaxed version of the original problem (we can do that by removing the delete lists from all actions, i.e. removing all negative literals from effects) that will be easier to solve through GraphPlan and where the length of the solution will serve as a good heuristic. """ relaxed_planning_problem = PlanningProblem(initial=self.goal, goals=subgoal.state, actions=[action.relaxed() for action in self.planning_problem.actions]) try: return len(linearize(GraphPlan(relaxed_planning_problem).execute())) except: return float('inf')
In forward planning, the search is constrained by the initial state and only uses the goal as a stopping criterion and as a source for heuristics. In regression planning, the search is constrained by the goal and only uses the start state as a stopping criterion and as a source for heuristics. By converting the problem to a constraint satisfaction problem (CSP), the initial state can be used to prune what is not reachable and the goal to prune what is not useful. The CSP will be defined for a finite number of steps; the number of steps can be adjusted to find the shortest plan. One of the CSP methods can then be used to solve the CSP and thus find a plan.
To construct a CSP from a planning problem, first choose a fixed planning horizon, which is the number of time steps over which to plan. Suppose the horizon is $k$. The CSP has the following variables:
There are several types of constraints:
The PDDL representation gives precondition, effect and frame constraints for each time $t$ as follows:
that specifies that if the action is to be $A$, $Var_t$ must have value $v$ immediately before. This constraint is violated when $Action_t = A$ and $Var_t \neq v$, and thus is equivalent to $\lnot{(Var_t \neq v \land Action_t = A)}$;
which is violated when $Action_t = A$ and $Var_{t+1} \neq v$, and thus is equivalent to $\lnot{(Var_{t+1} \neq v \land Action_t = A)}$;
which specifies that the feature $Var$ has the same value before and after any action that does not affect $Var$.
The CSP representation assumes a fixed planning horizon (ie a fixed number of steps). To find a plan over any number of steps, the algorithm can be run for a horizon of $k = 0, 1, 2, \dots$ until a solution is found.
from csp import *
%psource CSPlan
def CSPlan(planning_problem, solution_length, CSP_solver=ac_search_solver, arc_heuristic=sat_up): """ [Section 10.4.3] Planning as Constraint Satisfaction Problem """ def st(var, stage): """Returns a string for the var-stage pair that can be used as a variable""" return str(var) + "_" + str(stage) def if_(v1, v2): """If the second argument is v2, the first argument must be v1""" def if_fun(x1, x2): return x1 == v1 if x2 == v2 else True if_fun.__name__ = "if the second argument is " + str(v2) + " then the first argument is " + str(v1) + " " return if_fun def eq_if_not_in_(actset): """First and third arguments are equal if action is not in actset""" def eq_if_not_in(x1, a, x2): return x1 == x2 if a not in actset else True eq_if_not_in.__name__ = "first and third arguments are equal if action is not in " + str(actset) + " " return eq_if_not_in expanded_actions = planning_problem.expand_actions() fluent_values = planning_problem.expand_fluents() for horizon in range(solution_length): act_vars = [st('action', stage) for stage in range(horizon + 1)] domains = {av: list(map(lambda action: expr(str(action)), expanded_actions)) for av in act_vars} domains.update({st(var, stage): {True, False} for var in fluent_values for stage in range(horizon + 2)}) # initial state constraints constraints = [Constraint((st(var, 0),), is_(val)) for (var, val) in {expr(str(fluent).replace('Not', '')): True if fluent.op[:3] != 'Not' else False for fluent in planning_problem.initial}.items()] constraints += [Constraint((st(var, 0),), is_(False)) for var in {expr(str(fluent).replace('Not', '')) for fluent in fluent_values if fluent not in planning_problem.initial}] # goal state constraints constraints += [Constraint((st(var, horizon + 1),), is_(val)) for (var, val) in {expr(str(fluent).replace('Not', '')): True if fluent.op[:3] != 'Not' else False for fluent in planning_problem.goals}.items()] # precondition constraints constraints += [Constraint((st(var, stage), st('action', stage)), if_(val, act)) # st(var, stage) == val if st('action', stage) == act for act, strps in {expr(str(action)): action for action in expanded_actions}.items() for var, val in {expr(str(fluent).replace('Not', '')): True if fluent.op[:3] != 'Not' else False for fluent in strps.precond}.items() for stage in range(horizon + 1)] # effect constraints constraints += [Constraint((st(var, stage + 1), st('action', stage)), if_(val, act)) # st(var, stage + 1) == val if st('action', stage) == act for act, strps in {expr(str(action)): action for action in expanded_actions}.items() for var, val in {expr(str(fluent).replace('Not', '')): True if fluent.op[:3] != 'Not' else False for fluent in strps.effect}.items() for stage in range(horizon + 1)] # frame constraints constraints += [Constraint((st(var, stage), st('action', stage), st(var, stage + 1)), eq_if_not_in_(set(map(lambda action: expr(str(action)), {act for act in expanded_actions if var in act.effect or Expr('Not' + var.op, *var.args) in act.effect})))) for var in fluent_values for stage in range(horizon + 1)] csp = NaryCSP(domains, constraints) sol = CSP_solver(csp, arc_heuristic=arc_heuristic) if sol: return [sol[a] for a in act_vars]
As shown in [[2]](cite-kautz1992planning) the translation of a Planning Domain Definition Language (PDDL) description into a Conjunctive Normal Form (CNF) formula is a series of straightforward steps:
where $ActionCausesF$ is a disjunction of all the ground actions that have $F$ in their add list, and $ActionCausesNotF$ is a disjunction of all the ground actions that have $F$ in their delete list;
A propositional planning procedure implements the basic idea just given but, because the agent does not know how many steps it will take to reach the goal, the algorithm tries each possible number of steps $t$, up to some maximum conceivable plan length $T_{max}$ . In this way, it is guaranteed to find the shortest plan if one exists. Because of the way the propositional planning procedure searches for a solution, this approach cannot be used in a partially observable environment, ie WalkSAT, but would just set the unobservable variables to the values it needs to create a solution.
from logic import *
%psource SATPlan
def SATPlan(planning_problem, solution_length, SAT_solver=cdcl_satisfiable): """ [Section 10.4.1] Planning as Boolean satisfiability """ def expand_transitions(state, actions): state = sorted(conjuncts(state)) for action in filter(lambda act: act.check_precond(state, act.args), actions): transition[associate('&', state)].update( {Expr(action.name, *action.args): associate('&', sorted(set(filter(lambda clause: clause.op[:3] != 'Not', action(state, action.args).clauses)))) if planning_problem.is_strips() else associate('&', sorted(set(action(state, action.args).clauses)))}) for state in transition[associate('&', state)].values(): if state not in transition: expand_transitions(expr(state), actions) transition = defaultdict(dict) expand_transitions(associate('&', planning_problem.initial), planning_problem.expand_actions()) return SAT_plan(associate('&', sorted(planning_problem.initial)), transition, associate('&', sorted(planning_problem.goals)), solution_length, SAT_solver=SAT_solver)
%psource SAT_plan
def SAT_plan(init, transition, goal, t_max, SAT_solver=cdcl_satisfiable): """Converts a planning problem to Satisfaction problem by translating it to a cnf sentence. [Figure 7.22] >>> transition = {'A': {'Left': 'A', 'Right': 'B'}, 'B': {'Left': 'A', 'Right': 'C'}, 'C': {'Left': 'B', 'Right': 'C'}} >>> SAT_plan('A', transition, 'C', 1) is None True """ # Functions used by SAT_plan def translate_to_SAT(init, transition, goal, time): clauses = [] states = [state for state in transition] # Symbol claiming state s at time t state_counter = itertools.count() for s in states: for t in range(time + 1): state_sym[s, t] = Expr("S{}".format(next(state_counter))) # Add initial state axiom clauses.append(state_sym[init, 0]) # Add goal state axiom clauses.append(state_sym[first(clause[0] for clause in state_sym if set(conjuncts(clause[0])).issuperset(conjuncts(goal))), time]) \ if isinstance(goal, Expr) else clauses.append(state_sym[goal, time]) # All possible transitions transition_counter = itertools.count() for s in states: for action in transition[s]: s_ = transition[s][action] for t in range(time): # Action 'action' taken from state 's' at time 't' to reach 's_' action_sym[s, action, t] = Expr("T{}".format(next(transition_counter))) # Change the state from s to s_ clauses.append(action_sym[s, action, t] | '==>' | state_sym[s, t]) clauses.append(action_sym[s, action, t] | '==>' | state_sym[s_, t + 1]) # Allow only one state at any time for t in range(time + 1): # must be a state at any time clauses.append(associate('|', [state_sym[s, t] for s in states])) for s in states: for s_ in states[states.index(s) + 1:]: # for each pair of states s, s_ only one is possible at time t clauses.append((~state_sym[s, t]) | (~state_sym[s_, t])) # Restrict to one transition per timestep for t in range(time): # list of possible transitions at time t transitions_t = [tr for tr in action_sym if tr[2] == t] # make sure at least one of the transitions happens clauses.append(associate('|', [action_sym[tr] for tr in transitions_t])) for tr in transitions_t: for tr_ in transitions_t[transitions_t.index(tr) + 1:]: # there cannot be two transitions tr and tr_ at time t clauses.append(~action_sym[tr] | ~action_sym[tr_]) # Combine the clauses to form the cnf return associate('&', clauses) def extract_solution(model): true_transitions = [t for t in action_sym if model[action_sym[t]]] # Sort transitions based on time, which is the 3rd element of the tuple true_transitions.sort(key=lambda x: x[2]) return [action for s, action, time in true_transitions] # Body of SAT_plan algorithm for t in range(t_max + 1): # dictionaries to help extract the solution from model state_sym = {} action_sym = {} cnf = translate_to_SAT(init, transition, goal, t) model = SAT_solver(cnf) if model is not False: return extract_solution(model) return None
%psource three_block_tower
def three_block_tower(): """ [Figure 10.3] THREE-BLOCK-TOWER A blocks-world problem of stacking three blocks in a certain configuration, also known as the Sussman Anomaly. Example: >>> from planning import * >>> tbt = three_block_tower() >>> tbt.goal_test() False >>> tbt.act(expr('MoveToTable(C, A)')) >>> tbt.act(expr('Move(B, Table, C)')) >>> tbt.goal_test() False >>> tbt.act(expr('Move(A, Table, B)')) >>> tbt.goal_test() True >>> """ return PlanningProblem(initial='On(A, Table) & On(B, Table) & On(C, A) & Clear(B) & Clear(C)', goals='On(A, B) & On(B, C)', actions=[Action('Move(b, x, y)', precond='On(b, x) & Clear(b) & Clear(y)', effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)', domain='Block(b) & Block(y)'), Action('MoveToTable(b, x)', precond='On(b, x) & Clear(b)', effect='On(b, Table) & Clear(x) & ~On(b, x)', domain='Block(b) & Block(x)')], domain='Block(A) & Block(B) & Block(C)')
%time blocks_world_solution = GraphPlan(three_block_tower()).execute()
linearize(blocks_world_solution)
CPU times: user 4.46 ms, sys: 124 µs, total: 4.59 ms Wall time: 4.48 ms
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = uniform_cost_search(ForwardPlan(three_block_tower()), display=True).solution()
blocks_world_solution = list(map(lambda action: Expr(action.name, *action.args), blocks_world_solution))
blocks_world_solution
14 paths have been expanded and 28 paths remain in the frontier CPU times: user 91 ms, sys: 0 ns, total: 91 ms Wall time: 89.8 ms
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = astar_search(ForwardPlan(three_block_tower()), display=True).solution()
blocks_world_solution = list(map(lambda action: Expr(action.name, *action.args), blocks_world_solution))
blocks_world_solution
3 paths have been expanded and 9 paths remain in the frontier CPU times: user 81.3 ms, sys: 3.11 ms, total: 84.5 ms Wall time: 83 ms
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = uniform_cost_search(BackwardPlan(three_block_tower()), display=True).solution()
blocks_world_solution = list(map(lambda action: Expr(action.name, *action.args), blocks_world_solution))
blocks_world_solution[::-1]
116 paths have been expanded and 289 paths remain in the frontier CPU times: user 266 ms, sys: 718 µs, total: 267 ms Wall time: 265 ms
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = astar_search(BackwardPlan(three_block_tower()), display=True).solution()
blocks_world_solution = list(map(lambda action: Expr(action.name, *action.args), blocks_world_solution))
blocks_world_solution[::-1]
4 paths have been expanded and 20 paths remain in the frontier CPU times: user 477 ms, sys: 450 µs, total: 477 ms Wall time: 476 ms
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = CSPlan(three_block_tower(), 3, arc_heuristic=no_heuristic)
blocks_world_solution
CPU times: user 172 ms, sys: 4.52 ms, total: 176 ms Wall time: 175 ms
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = CSPlan(three_block_tower(), 3, arc_heuristic=sat_up)
blocks_world_solution
CPU times: user 267 ms, sys: 0 ns, total: 267 ms Wall time: 266 ms
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = SATPlan(three_block_tower(), 4, SAT_solver=dpll_satisfiable)
blocks_world_solution
CPU times: user 34.9 s, sys: 15.9 ms, total: 34.9 s Wall time: 34.9 s
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%time blocks_world_solution = SATPlan(three_block_tower(), 4, SAT_solver=cdcl_satisfiable)
blocks_world_solution
CPU times: user 1.15 s, sys: 4.01 ms, total: 1.15 s Wall time: 1.15 s
[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]
%psource spare_tire
def spare_tire(): """ [Figure 10.2] SPARE-TIRE-PROBLEM A problem involving changing the flat tire of a car with a spare tire from the trunk. Example: >>> from planning import * >>> st = spare_tire() >>> st.goal_test() False >>> st.act(expr('Remove(Spare, Trunk)')) >>> st.act(expr('Remove(Flat, Axle)')) >>> st.goal_test() False >>> st.act(expr('PutOn(Spare, Axle)')) >>> st.goal_test() True >>> """ return PlanningProblem(initial='At(Flat, Axle) & At(Spare, Trunk)', goals='At(Spare, Axle) & At(Flat, Ground)', actions=[Action('Remove(obj, loc)', precond='At(obj, loc)', effect='At(obj, Ground) & ~At(obj, loc)', domain='Tire(obj)'), Action('PutOn(t, Axle)', precond='At(t, Ground) & ~At(Flat, Axle)', effect='At(t, Axle) & ~At(t, Ground)', domain='Tire(t)'), Action('LeaveOvernight', precond='', effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \ ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')], domain='Tire(Flat) & Tire(Spare)')
%time spare_tire_solution = GraphPlan(spare_tire()).execute()
linearize(spare_tire_solution)
CPU times: user 4.24 ms, sys: 1 µs, total: 4.24 ms Wall time: 4.16 ms
[Remove(Flat, Axle), Remove(Spare, Trunk), PutOn(Spare, Axle)]
%time spare_tire_solution = uniform_cost_search(ForwardPlan(spare_tire()), display=True).solution()
spare_tire_solution = list(map(lambda action: Expr(action.name, *action.args), spare_tire_solution))
spare_tire_solution
11 paths have been expanded and 9 paths remain in the frontier CPU times: user 10.3 ms, sys: 0 ns, total: 10.3 ms Wall time: 9.89 ms
[Remove(Flat, Axle), Remove(Spare, Trunk), PutOn(Spare, Axle)]
%time spare_tire_solution = astar_search(ForwardPlan(spare_tire()), display=True).solution()
spare_tire_solution = list(map(lambda action: Expr(action.name, *action.args), spare_tire_solution))
spare_tire_solution
5 paths have been expanded and 8 paths remain in the frontier CPU times: user 20.4 ms, sys: 1 µs, total: 20.4 ms Wall time: 19.4 ms
[Remove(Flat, Axle), Remove(Spare, Trunk), PutOn(Spare, Axle)]
%time spare_tire_solution = uniform_cost_search(BackwardPlan(spare_tire()), display=True).solution()
spare_tire_solution = list(map(lambda action: Expr(action.name, *action.args), spare_tire_solution))
spare_tire_solution[::-1]
29 paths have been expanded and 22 paths remain in the frontier CPU times: user 22.2 ms, sys: 7 µs, total: 22.2 ms Wall time: 21.3 ms
[Remove(Flat, Axle), Remove(Spare, Trunk), PutOn(Spare, Axle)]
%time spare_tire_solution = astar_search(BackwardPlan(spare_tire()), display=True).solution()
spare_tire_solution = list(map(lambda action: Expr(action.name, *action.args), spare_tire_solution))
spare_tire_solution[::-1]
3 paths have been expanded and 11 paths remain in the frontier CPU times: user 13 ms, sys: 0 ns, total: 13 ms Wall time: 12.5 ms
[Remove(Spare, Trunk), Remove(Flat, Axle), PutOn(Spare, Axle)]
%time spare_tire_solution = CSPlan(spare_tire(), 3, arc_heuristic=no_heuristic)
spare_tire_solution
CPU times: user 94.7 ms, sys: 0 ns, total: 94.7 ms Wall time: 93.2 ms
[Remove(Spare, Trunk), Remove(Flat, Axle), PutOn(Spare, Axle)]
%time spare_tire_solution = CSPlan(spare_tire(), 3, arc_heuristic=sat_up)
spare_tire_solution
CPU times: user 119 ms, sys: 0 ns, total: 119 ms Wall time: 118 ms
[Remove(Spare, Trunk), Remove(Flat, Axle), PutOn(Spare, Axle)]
%time spare_tire_solution = SATPlan(spare_tire(), 4, SAT_solver=dpll_satisfiable)
spare_tire_solution
CPU times: user 9.01 s, sys: 3.98 ms, total: 9.01 s Wall time: 9.01 s
[Remove(Flat, Axle), Remove(Spare, Trunk), PutOn(Spare, Axle)]
%time spare_tire_solution = SATPlan(spare_tire(), 4, SAT_solver=cdcl_satisfiable)
spare_tire_solution
CPU times: user 630 ms, sys: 6 µs, total: 630 ms Wall time: 628 ms
[Remove(Spare, Trunk), Remove(Flat, Axle), PutOn(Spare, Axle)]
%psource shopping_problem
def shopping_problem(): """ SHOPPING-PROBLEM A problem of acquiring some items given their availability at certain stores. Example: >>> from planning import * >>> sp = shopping_problem() >>> sp.goal_test() False >>> sp.act(expr('Go(Home, HW)')) >>> sp.act(expr('Buy(Drill, HW)')) >>> sp.act(expr('Go(HW, SM)')) >>> sp.act(expr('Buy(Banana, SM)')) >>> sp.goal_test() False >>> sp.act(expr('Buy(Milk, SM)')) >>> sp.goal_test() True >>> """ return PlanningProblem(initial='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)', goals='Have(Milk) & Have(Banana) & Have(Drill)', actions=[Action('Buy(x, store)', precond='At(store) & Sells(store, x)', effect='Have(x)', domain='Store(store) & Item(x)'), Action('Go(x, y)', precond='At(x)', effect='At(y) & ~At(x)', domain='Place(x) & Place(y)')], domain='Place(Home) & Place(SM) & Place(HW) & Store(SM) & Store(HW) & ' 'Item(Milk) & Item(Banana) & Item(Drill)')
%time shopping_problem_solution = GraphPlan(shopping_problem()).execute()
linearize(shopping_problem_solution)
CPU times: user 5.08 ms, sys: 3 µs, total: 5.08 ms Wall time: 5.03 ms
[Go(Home, HW), Go(Home, SM), Buy(Milk, SM), Buy(Drill, HW), Buy(Banana, SM)]
%time shopping_problem_solution = uniform_cost_search(ForwardPlan(shopping_problem()), display=True).solution()
shopping_problem_solution = list(map(lambda action: Expr(action.name, *action.args), shopping_problem_solution))
shopping_problem_solution
167 paths have been expanded and 257 paths remain in the frontier CPU times: user 187 ms, sys: 4.01 ms, total: 191 ms Wall time: 190 ms
[Go(Home, SM), Buy(Banana, SM), Buy(Milk, SM), Go(SM, HW), Buy(Drill, HW)]
%time shopping_problem_solution = astar_search(ForwardPlan(shopping_problem()), display=True).solution()
shopping_problem_solution = list(map(lambda action: Expr(action.name, *action.args), shopping_problem_solution))
shopping_problem_solution
9 paths have been expanded and 22 paths remain in the frontier CPU times: user 101 ms, sys: 3 µs, total: 101 ms Wall time: 100 ms
[Go(Home, SM), Buy(Banana, SM), Buy(Milk, SM), Go(SM, HW), Buy(Drill, HW)]
%time shopping_problem_solution = uniform_cost_search(BackwardPlan(shopping_problem()), display=True).solution()
shopping_problem_solution = list(map(lambda action: Expr(action.name, *action.args), shopping_problem_solution))
shopping_problem_solution[::-1]
176 paths have been expanded and 7 paths remain in the frontier CPU times: user 109 ms, sys: 2 µs, total: 109 ms Wall time: 107 ms
[Go(Home, HW), Buy(Drill, HW), Go(HW, SM), Buy(Milk, SM), Buy(Banana, SM)]
%time shopping_problem_solution = astar_search(BackwardPlan(shopping_problem()), display=True).solution()
shopping_problem_solution = list(map(lambda action: Expr(action.name, *action.args), shopping_problem_solution))
shopping_problem_solution[::-1]
18 paths have been expanded and 28 paths remain in the frontier CPU times: user 235 ms, sys: 9 µs, total: 235 ms Wall time: 234 ms
[Go(Home, SM), Buy(Banana, SM), Buy(Milk, SM), Go(SM, HW), Buy(Drill, HW)]
%time shopping_problem_solution = CSPlan(shopping_problem(), 5, arc_heuristic=no_heuristic)
shopping_problem_solution
CPU times: user 194 ms, sys: 6 µs, total: 194 ms Wall time: 192 ms
[Go(Home, HW), Buy(Drill, HW), Go(HW, SM), Buy(Banana, SM), Buy(Milk, SM)]
%time shopping_problem_solution = CSPlan(shopping_problem(), 5, arc_heuristic=sat_up)
shopping_problem_solution
CPU times: user 235 ms, sys: 7 µs, total: 235 ms Wall time: 233 ms
[Go(Home, HW), Buy(Drill, HW), Go(HW, SM), Buy(Banana, SM), Buy(Milk, SM)]
%time shopping_problem_solution = SATPlan(shopping_problem(), 5, SAT_solver=cdcl_satisfiable)
shopping_problem_solution
CPU times: user 1min 29s, sys: 36 ms, total: 1min 29s Wall time: 1min 29s
[Go(Home, HW), Buy(Drill, HW), Go(HW, SM), Buy(Banana, SM), Buy(Milk, SM)]
%psource air_cargo
def air_cargo(): """ [Figure 10.1] AIR-CARGO-PROBLEM An air-cargo shipment problem for delivering cargo to different locations, given the starting location and airplanes. Example: >>> from planning import * >>> ac = air_cargo() >>> ac.goal_test() False >>> ac.act(expr('Load(C2, P2, JFK)')) >>> ac.act(expr('Load(C1, P1, SFO)')) >>> ac.act(expr('Fly(P1, SFO, JFK)')) >>> ac.act(expr('Fly(P2, JFK, SFO)')) >>> ac.act(expr('Unload(C2, P2, SFO)')) >>> ac.goal_test() False >>> ac.act(expr('Unload(C1, P1, JFK)')) >>> ac.goal_test() True >>> """ return PlanningProblem(initial='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK)', goals='At(C1, JFK) & At(C2, SFO)', actions=[Action('Load(c, p, a)', precond='At(c, a) & At(p, a)', effect='In(c, p) & ~At(c, a)', domain='Cargo(c) & Plane(p) & Airport(a)'), Action('Unload(c, p, a)', precond='In(c, p) & At(p, a)', effect='At(c, a) & ~In(c, p)', domain='Cargo(c) & Plane(p) & Airport(a)'), Action('Fly(p, f, to)', precond='At(p, f)', effect='At(p, to) & ~At(p, f)', domain='Plane(p) & Airport(f) & Airport(to)')], domain='Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)')
%time air_cargo_solution = GraphPlan(air_cargo()).execute()
linearize(air_cargo_solution)
CPU times: user 9.06 ms, sys: 3 µs, total: 9.06 ms Wall time: 8.94 ms
[Load(C2, P2, JFK), Fly(P2, JFK, SFO), Load(C1, P1, SFO), Fly(P1, SFO, JFK), Unload(C1, P1, JFK), Unload(C2, P2, SFO)]
%time air_cargo_solution = uniform_cost_search(ForwardPlan(air_cargo()), display=True).solution()
air_cargo_solution = list(map(lambda action: Expr(action.name, *action.args), air_cargo_solution))
air_cargo_solution
838 paths have been expanded and 1288 paths remain in the frontier CPU times: user 3.56 s, sys: 4 ms, total: 3.57 s Wall time: 3.56 s
[Load(C2, P2, JFK), Fly(P2, JFK, SFO), Unload(C2, P2, SFO), Load(C1, P2, SFO), Fly(P2, SFO, JFK), Unload(C1, P2, JFK)]
%time air_cargo_solution = astar_search(ForwardPlan(air_cargo()), display=True).solution()
air_cargo_solution = list(map(lambda action: Expr(action.name, *action.args), air_cargo_solution))
air_cargo_solution
17 paths have been expanded and 54 paths remain in the frontier CPU times: user 716 ms, sys: 0 ns, total: 716 ms Wall time: 717 ms
[Load(C2, P2, JFK), Fly(P2, JFK, SFO), Unload(C2, P2, SFO), Load(C1, P2, SFO), Fly(P2, SFO, JFK), Unload(C1, P2, JFK)]
%time air_cargo_solution = uniform_cost_search(BackwardPlan(air_cargo()), display=True).solution()
air_cargo_solution = list(map(lambda action: Expr(action.name, *action.args), air_cargo_solution))
air_cargo_solution[::-1]
506 paths have been expanded and 65 paths remain in the frontier CPU times: user 970 ms, sys: 0 ns, total: 970 ms Wall time: 971 ms
[Load(C1, P1, SFO), Fly(P1, SFO, JFK), Load(C2, P1, JFK), Unload(C1, P1, JFK), Fly(P1, JFK, SFO), Unload(C2, P1, SFO)]
%time air_cargo_solution = astar_search(BackwardPlan(air_cargo()), display=True).solution()
air_cargo_solution = list(map(lambda action: Expr(action.name, *action.args), air_cargo_solution))
air_cargo_solution[::-1]
23 paths have been expanded and 50 paths remain in the frontier CPU times: user 1.19 s, sys: 2 µs, total: 1.19 s Wall time: 1.2 s
[Load(C2, P2, JFK), Fly(P2, JFK, SFO), Unload(C2, P2, SFO), Load(C1, P2, SFO), Fly(P2, SFO, JFK), Unload(C1, P2, JFK)]
%time air_cargo_solution = CSPlan(air_cargo(), 6, arc_heuristic=no_heuristic)
air_cargo_solution
CPU times: user 6.5 s, sys: 0 ns, total: 6.5 s Wall time: 6.51 s
[Load(C1, P1, SFO), Fly(P1, SFO, JFK), Load(C2, P1, JFK), Unload(C1, P1, JFK), Fly(P1, JFK, SFO), Unload(C2, P1, SFO)]
%time air_cargo_solution = CSPlan(air_cargo(), 6, arc_heuristic=sat_up)
air_cargo_solution
CPU times: user 13.6 s, sys: 7.98 ms, total: 13.7 s Wall time: 13.7 s
[Load(C1, P1, SFO), Fly(P1, SFO, JFK), Load(C2, P1, JFK), Unload(C1, P1, JFK), Fly(P1, JFK, SFO), Unload(C2, P1, SFO)]
[[1]](#ref-1) Hoffmann, Jörg. 2001. FF: The fast-forward planning system.
[[2]](#ref-2) Kautz, Henry A and Selman, Bart and others. 1992. Planning as Satisfiability.