AP-Jupyternotebook.ipynb · master · Claudio Scheer / heuristic-plans …

archived 16 Jan 2026 10:34:34 UTC
Skip to content
AP-Jupyternotebook.ipynb 73.55 KiB

Assignment 1: Planning using Heuristic Search

Felipe Meneguzzi
Mauricio Magnaguagno (PhD Student)
Leonardo Rosa Amado (PhD Student)
Gabriel Paludo Licks (MSc Student)
AI Planning (1982D-02):
  • Assigned: 14 May
  • Due: 14 June

Assignment Overview

The goal of this work is to implement the core functions of an automated planner and benchmark a number of heuristics. You will implement three main functions in this assignment:
  • Implement a number of heuristic functions
    • Max-Cost ($h^{max}$)
    • Additive Cost ($h^{add}$)
    • Relaxed Plan ($h^{FF}$)
    • Critical Path ($h^{m}$) (Optional)
    • Optimal Delete Relaxation ($h^{+}$) (Optional)
    • Landmark Heuristics ($h^{LM}$ and ) (Optional)
  • Implement a function capable of validating a plan given a domain and a problem.
  • Finally, implement the heuristic search A\*
After implementing the required functions, you must write a 2-page paper. The entire package must be delivered using GitHub, where your implemented functions must be contained in this Jupyter Notebook, and the paper as a separate pdf file committed in the same Github repository (the template is in the paper folder).

Experimentation

Grading

In order to properly evaluate your work and thought process, you will write a 2-page report in the AAAI two-column format explaining your encoding and experiments. These guidelines are to be followed exactly. Reports that are less than two pages of actual content, or not in format will receive 0 marks for the report criterion. This report will be included in the deliverables of the assignment. The formatting instructions are available at Overleaf (AAAI Press). The report must have the following sections:
  • An introduction with your understanding of the problem domain, outlining the remainder of the paper;
  • Three sections explaining each part of your implementation (search, heuristic, and validator).
  • One experimentation section where you measure the performance of the planner using your action formalisation for each of the domains, on multiple problems.
  • One conclusion section, where you will summarise your experience in encoding planning domains and discuss the performance of the planner, and any limitations encountered in solving the problems you encoded.
Grading will consider elements of your encoding, experimentation and reporting of the work done. The criteria, as well as their weight in the final grade is as follows:
  • Implementation (70%):
    • Heuristic functions (30%):
      • $h^{max}$ (10%);
      • $h^{add}$ (10%);
      • $h^{FF}$ (10%); and
      • Any other heuristic and tests (10% bonus).
    • Validator (10%);
    • Heuristic search (30%):
      • Correctness and optimality (20%); and
      • Runtime efficiency (10%).
  • Overall report readability (20%) — how accessible and coherent your explanation of your implementation is;
  • Code readability (10%).

Collaboration Policy

You must work on this project individually. You are free to discuss high-level design issues with the people in your class, but every aspect of your actual formalisation must be entirely your own work. Furthermore, there can be no textual similarities in the reports generated by each group. Plagiarism, no matter the degree, will result in forfeiture of the entire grade of this assignment.

Sections

Auxiliary code

In this cell we provide two methods to aid you in your implementation. First, we provide a method to verify if an action is applicable, in a given a state (all positive preconditions are contained in the state, and no negative preconditions are contained in the state). Second, we provide a method to apply an action in a given state, returning the new resulting state.
In [14]:
def applicable(state, precondition):
    positive, negative = precondition 
    return positive.issubset(state) and not negative.intersection(state)

def apply(state, effects):
    positive, negative = effects
    return frozenset(state.difference(negative).union(positive))
In the next cell, we detail some of the usage of the given base code.
In [15]:
from pddl.action import Action

# Objects example

# An action to move robot r1 from location l1 to location l2
a1 = Action(
    'move', #Action name
    ['r1', 'l1', 'l2'], #Parameters
    frozenset({('at', 'r1', 'l1'), ('adjacent', 'l1', 'l2')}), #Positive preconditions
    frozenset({('occupied', 'l2')}), #Negative preconditions
    frozenset({('at', 'r1', 'l2'), ('occupied', 'l2')}), #Add effects
    frozenset({('at', 'r1', 'l1'), ('occupied', 'l1')})  #Del effects
)

# Get each element from the action
print(a1.name)
print(a1.parameters)
print(a1.positive_preconditions)
print(a1.negative_preconditions)
print(a1.add_effects)
print(a1.del_effects)

print('---------------------------------------------')

# The list of actions contains all possible actions
actions = [
    a1,
    # ...
]

# Only positive literals are present in the initial state
initial_state = frozenset({
    ('on', 'ca', 'pallet'),
    ('at', 'r1', 'l1'),
    ('belong', 'k1', 'l1'),
    ('adjacent', 'l1', 'l2'), ('adjacent', 'l2', 'l1'), ('attached', 'q2', 'l2'),
    ('empty', 'k2'),
    ('attached', 'p1', 'l1'), ('occupied', 'l1'),
    ('empty', 'k1'),
    # ...
})

# Goal literals are split in two, positive and negative
positive_goal = frozenset({('in', 'cb', 'p1'), ('in', 'ca', 'p1')})
negative_goal = frozenset()

#Test if the action move (variable a1) is applicable in our initial state (initial_state)
applicable_action = applicable(initial_state, (a1.positive_preconditions, a1.negative_preconditions))
print('Is the action move applicable?', applicable_action)

print('---------------------------------------------')

#Apply the action move in the initial state
resulting_state = apply(initial_state, (a1.add_effects, a1.del_effects))
print('Resulting state:')
for predicate in resulting_state:
    print(predicate)

print('---------------------------------------------')

#Test if the goal was achieved  
goal_achieved = applicable(resulting_state, (positive_goal, negative_goal))
print('Was the goal achieved?', goal_achieved)

print('---------------------------------------------')

# The output plan from the planner is either a list of actions or failure (None)
# An empty plan is valid
plan = []
# Preconditions and effects are empty when obtained from a plan file, may be filled when obtained from the planner
plan = [
    Action('take', ['k1', 'cc', 'cb', 'p1', 'l1'], [], [], [], []), 
    Action('load', ['k1', 'r1', 'cc', 'l1'], [], [], [], []),
    Action('move', ['r1', 'l1', 'l2'], [], [], [], []),
    Action('unload', ['k2', 'r1', 'cc', 'l2'], [], [], [], [])
    # ...
]
# Failure
plan = None

# A valid plan is either true or false
valid_plan   = True
invalid_plan = False
Out [15]:
move
['r1', 'l1', 'l2']
frozenset({('adjacent', 'l1', 'l2'), ('at', 'r1', 'l1')})
frozenset({('occupied', 'l2')})
frozenset({('occupied', 'l2'), ('at', 'r1', 'l2')})
frozenset({('occupied', 'l1'), ('at', 'r1', 'l1')})
---------------------------------------------
Is the action move applicable? True
---------------------------------------------
Resulting state:
('attached', 'q2', 'l2')
('adjacent', 'l1', 'l2')
('adjacent', 'l2', 'l1')
('attached', 'p1', 'l1')
('on', 'ca', 'pallet')
('occupied', 'l2')
('empty', 'k1')
('belong', 'k1', 'l1')
('at', 'r1', 'l2')
('empty', 'k2')
---------------------------------------------
Was the goal achieved? False
---------------------------------------------

Heuristics

Implement heuristic functions

Max-cost

You will implement the Max-cost heuristic. Return estimated distance between current state and goal , a number between 0 (when ) and infinity (when is unreachable).
where is the cost of action ($1$ if not specified), is the set of precoditions of action , and is the set of effects of action . Your code must be contained in the h(self, actions, state, goals) function in the cell below. You can create additional functions (do not forget to comment the code intelligibly). H takes the following inputs:
  • actions: list of ground actions
  • state: current state
  • goals: tuple with (positive predicates, negative predicates) of the goal

Relaxed Planning Graph

In this section, we provide a python implementation of how to build the Relaxed Planning Graph. You can use this implementation to obtain the max-cost heuristic, instead of the previous formula (with only a few modifications). Use wisely!
def build_rpg(actions, state, goals):
    i = 0
    factlevel = [state]
    actionlevel = []
    positive_goals = goals[0]
    while not positive_goals.issubset(factlevel[i]):
        # actionlevel[i]
        actionlevel.append([a for a in actions if a.positive_preconditions.issubset(factlevel[i])])
        # factlevel[i+1]
        factlevel.append(factlevel[i].union([pre for a in actionlevel[i] for pre in a.add_effects]))
        if factlevel[i+1] == factlevel[i]:
            return "G unreachable"
        i += 1
    return (factlevel, actionlevel)
In [16]:
from pddl.heuristic import Heuristic

class MaxHeuristic(Heuristic):
    #Positive goals and negative goals in a tuple
    def h(self, actions, state, goals):
        # YOUR CODE HERE
        reachable = state
        goals_missing = goals[0]
        max_cost = 0
        # While the goals are not in the state, keep searching.
        while not goals_missing.issubset(reachable):
            # Get all actions applicable to the current state level.
            last_state = frozenset(
                [a for a in actions if a.positive_preconditions.issubset(reachable)])
            # The next state will contain all the actions from previous states,
            # plus the effects actions when executing the actions applicable to the current state.
            new_reachable = reachable.union(
                [pre for a in last_state for pre in a.add_effects]
            )
            # When the next state is the same as the current state,
            # it means that there are no more effect actions that can reach the goal.
            if new_reachable == reachable:
                return float("inf")
            reachable = new_reachable
            max_cost += 1
        return max_cost

Additive

You will implement the Additive heuristic. Return estimated distance between current state and goal , a number between 0 (when ) and infinity (when is unreachable).
where is the cost of action ($1$ if not specified), is the set of precoditions of action , and is the set of effects of action . Your code must be contained in the h(self, actions, state, goals) function in the cell below. You can create additional functions (do not forget to comment the code intelligibly). H takes the following inputs:
  • actions: list of ground actions
  • state: current state
  • goals: tuple with (positive predicates, negative predicates) of the goal
In [17]:
class AdditiveHeuristic(Heuristic):

    def h(self, actions, state, goals):
        # YOUR CODE HERE
        reachable = state
        goals_missing = goals[0]
        goals_reached = None
        last_state = None
        add = 0
        # The cost to reach goals that are in the initial state is zero.
        costs = {p: 0 for p in state}
        while last_state != reachable:
            goals_reached = goals_missing.intersection(reachable)
            if goals_reached:
                # Sum the costs of the reached goals.
                add += sum(costs[g] for g in goals_reached)
                goals_missing = goals_missing.difference(goals_reached)
            if not goals_missing:
                return add

            last_state = reachable

            for action in actions:
                # Test whether preconditions of the actions are in the state.
                if action.positive_preconditions.issubset(last_state):
                    new_reachable = action.add_effects.difference(reachable)
                    # Loop over all new reachable state to get the cost of these actions.
                    for effect in new_reachable:
                        # The cost of the effect will be the sum of the costs of the preconditions plus 1, because it is the next step.
                        costs[effect] = (
                            sum(costs[pre]
                                for pre in action.positive_preconditions) + 1
                        )
                    reachable = reachable.union(new_reachable)
        return float("inf")

Relaxed Plan

You will implement the Relaxed Plan (a.k.a. Fast Forward) heuristic. Return estimated distance between current state and goal , a number between 0 (when ) and infinity (when is unreachable).
Notice that I've already implemented the code to build the best supporter for each predicate. The actual heuristic consists of computing a relaxed plan by backchaining from the goals via their best supporters and returning the size of the resulting relaxed plan.
Your code must be contained in the h(self, actions, state, goals) function in the cell below. You can create additional functions (do not forget to comment the code intelligibly). H takes the following inputs:
  • actions: list of ground actions
  • state: current state
  • goals: tuple with (positive predicates, negative predicates) of the goal
In [18]:
class FastForwardHeuristic(Heuristic):

    def build_bs_table(self, actions, initial_state, goal):
        self.empty_action = Action('nop', frozenset(), frozenset(), frozenset(), frozenset(), frozenset())
        self.bs_table = dict()
        return self.update_bs_table(actions, initial_state, goal)

    def update_bs_table(self, actions, initial_state, goal):
        positive_g, negative_g = goal
        if not positive_g:
            return 0
        reachable = set(initial_state)
        missing_positive_g = set(positive_g)
        last_state = None
        t_add = {p: 0 for p in initial_state}  # Everything in the initial state costs 0
        add = 0
        while last_state != reachable:
            g_reached = missing_positive_g.intersection(reachable)
            if g_reached:
                add += sum(t_add[g] for g in g_reached)
                missing_positive_g -= g_reached
                if not missing_positive_g:
                    return add
            last_state = set(reachable)
            for a in actions:
                if a.positive_preconditions <= last_state:
                    new_reachable = a.add_effects - reachable
                    for eff in new_reachable:
                        if eff in t_add:
                            old_t_add = t_add[eff]
                            t_add[eff] = min(sum(t_add[pre] for pre in a.positive_preconditions)+1, t_add[eff])
                            if t_add[eff] != old_t_add:
                                self.bs_table[eff] = a  # best supporter changed
                        else:
                            t_add[eff] = sum(t_add[pre] for pre in a.positive_preconditions)+1
                            self.bs_table[eff] = a
                    reachable.update(new_reachable)
        return float("inf")

    def best_supporter(self, actions, initial_state, g):
        if g not in self.bs_table.keys():
            return self.empty_action
        return self.bs_table[g]

    def h(self, actions, initial_state, goal):
        # Build best supporter (I've done this for you)
        add = self.build_bs_table(actions, initial_state, goal)
        if add == 0:
            return 0
        elif add == float("inf"):
            return float("inf")

        r_plan = set()
        actions_already_explored = set()
        actions_to_explore = []

        for g in goal[0]:
            actions_to_explore.append(
                self.best_supporter(actions, initial_state, g))
            actions_already_explored.add(g)

        while actions_to_explore:
            action = actions_to_explore.pop()
            if action.name != "nop" and action not in r_plan:
                for precondition in action.positive_preconditions:
                    if precondition not in actions_already_explored:
                        actions_to_explore.append(self.best_supporter(
                            actions, initial_state, precondition))
                        actions_already_explored.add(precondition)
                r_plan.add(action)

        return len(r_plan)

Test the heuristic functions

We will test the Heuristics using 5 different domains, dinner, tsp, dwr, dompteur, and logistics. The state used is the initial state of each problem.
At each execution we show the expected value for the heuristic.
In [19]:
from pddl.pddl_parser import PDDLParser
from pddl.action import Action

# The following should be visible to the students
# Load some domain and some problem
dwr = "examples/dwr/dwr.pddl"
pb1_dwr = "examples/dwr/pb1.pddl"
pb2_dwr = "examples/dwr/pb2.pddl"

tsp = "examples/tsp/tsp.pddl"
pb1_tsp = "examples/tsp/pb1.pddl"
pb2_tsp = "examples/tsp/pb2.pddl"

dinner = "examples/dinner/dinner.pddl"
pb1_dinner = "examples/dinner/pb1.pddl"

dompteur = "examples/dompteur/dompteur.pddl"
pb1_dompteur = "examples/dompteur/pb1.pddl"

logistics = "examples/logistics/logistics.pddl"
pb1_logistics = "examples/logistics/pb1.pddl"
pb2_logistics = "examples/logistics/pb2.pddl"

def parse_domain_problem(domain, problem):
    parser = PDDLParser()
    parser.parse_domain(domain)
    parser.parse_problem(problem)
    # Grounding process
    actions = []
    for action in parser.actions:
        for act in action.groundify(parser.objects):
            actions.append(act)
    return parser, actions

def test_heuristic(domain, problem, h, expected):
    parser, actions = parse_domain_problem(domain, problem)
    v = h.h(actions, parser.state, (parser.positive_goals, parser.negative_goals))
    print("Expected " + str(expected) + ", got:", str(v) + ('. Correct!' if v == expected else '. False!'))

# Apply Hmax to initial states of many problems from many domains
h = MaxHeuristic()
test_heuristic(dwr, pb1_dwr, h, 6)
test_heuristic(dwr, pb2_dwr, h, 0)
test_heuristic(tsp, pb1_tsp, h, 2)
test_heuristic(tsp, pb2_tsp, h, 2)
test_heuristic(dinner, pb1_dinner, h, 1)
test_heuristic(dompteur, pb1_dompteur, h, 2)
test_heuristic(logistics, pb1_logistics, h, 4)
test_heuristic(logistics, pb2_logistics, h, 4)

h = AdditiveHeuristic()
test_heuristic(dwr, pb1_dwr, h, 38)
test_heuristic(dwr, pb2_dwr, h, 0)
test_heuristic(tsp, pb1_tsp, h, 8)
test_heuristic(tsp, pb2_tsp, h, 8)
test_heuristic(dinner, pb1_dinner, h, 2)
test_heuristic(dompteur, pb1_dompteur, h, 2)
test_heuristic(logistics, pb1_logistics, h, 7)
test_heuristic(logistics, pb2_logistics, h, 10)

h = FastForwardHeuristic()
test_heuristic(dwr, pb1_dwr, h, 16)
test_heuristic(dwr, pb2_dwr, h, 0)
test_heuristic(tsp, pb1_tsp, h, 5)
test_heuristic(tsp, pb2_tsp, h, 5)
test_heuristic(dinner, pb1_dinner, h, 2)
test_heuristic(dompteur, pb1_dompteur, h, 2)
test_heuristic(logistics, pb1_logistics, h, 5)
test_heuristic(logistics, pb2_logistics, h, 5)
Out [19]:
Expected 6, got: 6. Correct!
Expected 0, got: 0. Correct!
Expected 2, got: 2. Correct!
Expected 2, got: 2. Correct!
Expected 1, got: 1. Correct!
Expected 2, got: 2. Correct!
:constants is not recognized in domain
Expected 4, got: 4. Correct!
:constants is not recognized in domain
Expected 4, got: 4. Correct!
Expected 38, got: 38. Correct!
Expected 0, got: 0. Correct!
Expected 8, got: 8. Correct!
Expected 8, got: 8. Correct!
Expected 2, got: 2. Correct!
Expected 2, got: 2. Correct!
:constants is not recognized in domain
Expected 7, got: 7. Correct!
:constants is not recognized in domain
Expected 10, got: 10. Correct!
Expected 16, got: 16. Correct!
Expected 0, got: 0. Correct!
Expected 5, got: 5. Correct!
Expected 5, got: 5. Correct!
Expected 2, got: 2. Correct!
Expected 2, got: 2. Correct!
:constants is not recognized in domain
Expected 5, got: 5. Correct!
:constants is not recognized in domain
Expected 5, got: 5. Correct!
In [20]:
from nose.tools import assert_equal

dompteur = "examples/dompteur/dompteur.pddl"
pb1_dompteur = "examples/dompteur/pb1.pddl"

logistics = "examples/logistics/logistics.pddl"
pb1_logistics = "examples/logistics/pb1.pddl"
pb2_logistics = "examples/logistics/pb2.pddl"

def check_heuristic(domain, problem, h, expected):
    parser, actions = parse_domain_problem(domain, problem)
    v = h.h(actions, parser.state, (parser.positive_goals, parser.negative_goals))
    print("Expected " + str(expected) + ", got:", str(v) + ('. Correct!' if v == expected else '. False!'))
    assert_equal(expected, v)
h = MaxHeuristic()
h = AdditiveHeuristic()
h = FastForwardHeuristic()

Other Heuristics

You are free to implement any of the other heuristics seen in class for bonus points.
class DeleteRelaxationHeuristic(Heuristic):
    """Optimal Delete Relaxation Heuristic. 
    Please note, this heuristic is very slow. no matter how good your implementation is."""

    def h(self, actions, initial_state, goal):
        # YOUR CODE HERE
        raise NotImplementedError()
        return float("inf")

    
class CriticalPathHeuristic(Heuristic):
    """Haslum's H^m Heuristic"""

    def __init__(self, m, stats=None):
        super().__init__(stats)
        self.m = m
        self.facts_at = []
        self.mutexes_at = []
        self.all_facts = None
    
    def compute_all_facts(self, actions):
        self.all_facts = set()
        for a in actions:
            self.all_facts |= a.add_effects
            self.all_facts |= a.positive_preconditions
            self.all_facts |= a.del_effects  
            self.all_facts |= a.negative_preconditions  
        self.all_facts = frozenset(self.all_facts)

    # You can put any additional methods here, please erase the "raise NotImplementedError()" line below
    # YOUR CODE HERE
    raise NotImplementedError()

    def h(self, actions, initial_state, goal):
        if not self.all_facts:  # Cache all facts the first time this this is called
            self.compute_all_facts(actions)
        
        # YOUR CODE HERE
        raise NotImplementedError()
    
    
class LMCutHeuristic(Heuristic):

    def h(self, actions, initial_state, goal):
        # YOUR CODE HERE
        raise NotImplementedError()
        return float("inf")
## TODO, do something here

Validator

Implement the validate function

You will now implement a validator capable of verifying if a plan is valid to a specific domain and problem. The validator must return True if and only if the given plan is applicable and reaches the specified goal, and False if the plan itself is not applicable or the given plan does not achieve the specified goal. Your code must be contained in the validate(self, actions, initial_state, goals, plan) function in the cell below. You can create additional functions (do not forget to comment the code intelligibly). Validate takes the following inputs:
  • actions: list of ground actions
  • initial_state: initial state of the problem file
  • goals: tuple with (positive predicates, negative predicates) of the goal
  • plan: plan parsed from a plan trace
In [21]:
from pddl.pddl_parser import PDDLParser
from pddl.action import Action

class Validator:

    def parse_plan(self, filename):
        with open(filename,'r') as f:
            plan = []
            for act in f.read().splitlines():
                act = act[1:-1].split()
                plan.append(Action(act[0], tuple(act[1:]), [], [], [], []))
            return plan

    def validate_file(self, domainfile, problemfile, planfile):
        return self.validate_plan(domainfile, problemfile, self.parse_plan(planfile))

    def validate_plan(self, domainfile, problemfile, plan):
        # Parser
        parser = PDDLParser()
        parser.parse_domain(domainfile)
        parser.parse_problem(problemfile)
        # Grounding process
        ground_actions = []
        for action in parser.actions:
            for act in action.groundify(parser.objects):
                ground_actions.append(act)
        return self.validate(ground_actions, parser.state, (parser.positive_goals, parser.negative_goals), plan)
    
    # =====================================
    # Params:
    # actions -> list of ground actions
    # initial_state -> initial state of the problem file
    # goals -> tuple with (positive predicates, negative predicates) of the goal
    # plan -> plan parsed from a plan trace
    # =====================================

    #Positive goals and negative goals in a tuple
    def validate(self, actions, initial_state, goals, plan):
        # YOUR CODE HERE
        state = initial_state
        for line in plan:
            for action in actions:
                if line.parameters == action.parameters:
                    # I do not think this test is necessary, but I will keep it just to be sure.
                    if applicable(
                        state, (action.positive_preconditions,
                                action.negative_preconditions)
                    ):
                        state = apply(
                            state, (action.add_effects, action.del_effects))
                        break

        goals_reached = goals[0].intersection(state)
        return goals_reached == goals[0]

Test the validate function

In this test, we verify the correctness of the implemented validator using the dwr domain. Consider running more tests to ensure the correctness of the implemented function.
In [22]:
dwr = "examples/dwr/dwr.pddl"
pb1 = "examples/dwr/pb1.pddl"
plan1 = "examples/dwr/dwr_pb1_bfs.plan"
plan2 = "examples/dwr/dwr_pb1_heuristic.plan"
plan_empty = "examples/dwr/empty.plan"
val = Validator()
print("Expected True, got:", str(val.validate_file(dwr, pb1, plan1)))
print("Expected True, got:", str(val.validate_file(dwr, pb1, plan2)))
print("Expected False, got:", str(val.validate_file(dwr, pb1, plan_empty)))
Out [22]:
Expected True, got: True
Expected True, got: True
Expected False, got: False

Planner

Implement the planner solve function

You will implement the A\* search. This search must use the implemented Max-cost heuristic. The search receives a domain pddl file and a problem pddl file (both are already parsed for you). The search must always return an optimal plan, given that there is a solution for the given problem in the specified domain. Your code must be contained in the solve(self, actions, state, goals) function (in the following cell). Solve takes the following inputs:
  • actions: list of grounded actions
  • state: initial state of the problem file
  • goals: tuple with (positive predicates, negative predicates) of the goal
In [23]:
from pddl.pddl_planner import PDDLPlanner
import sys
import queue as queue

class HeuristicPlanner(PDDLPlanner):

    def __init__(self, heuristic=MaxHeuristic(), verbose=False, collect_stats=False):
        super().__init__(verbose,collect_stats)
        self.h = heuristic

    # -----------------------------------------------
    # Solve
    # -----------------------------------------------
    
    # =====================================
    # Params:
    # actions -> list of grounded actions
    # state -> initial state of the problem file
    # goals -> tuple with (positive predicates, negative predicates) of the goal
    # =====================================
    
    #Positive goals and negative goals in a tuple
    def solve(self, actions, state, goals):
        # YOUR CODE HERE
        frontier = queue.PriorityQueue()

        parent_state = {}
        cost_state = {}
        action_applied = {}

        parent_state[state] = None
        cost_state[state] = 0
        action_applied[state] = None

        frontier.put((0, state))

        while not frontier.empty():
            _, current_state = frontier.get()
            cost_current_state = cost_state[current_state]

            # Test whether the goals have been reached.
            if goals[0].issubset(current_state):
                # Get the backward path.
                path = []
                while action_applied[current_state]:
                    path.append(action_applied[current_state])
                    # current = parent
                    current_state = parent_state[current_state]
                path.reverse()
                return path

            for action in actions:
                # Get actions applicable to current state.
                if applicable(
                    current_state, (action.positive_preconditions,
                                    action.negative_preconditions),
                ):
                    new_state = apply(
                        current_state, (action.add_effects, action.del_effects))
                    heuristic_new_state = self.h(actions, new_state, goals)
                    if heuristic_new_state == float("inf"):
                        # State non-reachable.
                        continue
                    # My cost is the same as the state depth in the search tree.
                    cost_new_state = cost_current_state + 1
                    # Add the new state to the frontier if the new state has never been visited before or
                    # the cost of the new state is less than the same state visited before.
                    if new_state not in cost_state or cost_new_state < cost_current_state:
                        # The new state's priority is the cost to achieve the state plus the heuristic estimation to the goal.
                        priority = cost_new_state + heuristic_new_state
                        cost_state[new_state] = cost_new_state
                        parent_state[new_state] = current_state
                        action_applied[new_state] = action
                        # Add the new state to the frontier with the state's priority.
                        frontier.put((priority, new_state))
        return None

Test planner completeness and optimality

Here we perform a simple test to verify if the lenght of the plan found by your implementation is step optimal. Please note, that this test does not verify if the plan is valid (but we are going to test this). You can use your own implementation of the validator to verify this (highly recommended).
In [24]:
#Student_tests
dwr = "examples/dwr/dwr.pddl"
pb1 = "examples/dwr/pb1.pddl"
pb2 = "examples/dwr/pb2.pddl"
planner = HeuristicPlanner()

plan, time = planner.solve_file(dwr, pb1)
print("Expected 17, got:", str(len(plan)) + ('. Correct!' if len(plan) == 17 else '. False!'))
plan, time = planner.solve_file(dwr, pb2)
print("Expected 0, got:", str(len(plan)) + ('. Correct!' if len(plan) == 0 else '. False!'))
Out [24]:
Expected 17, got: 17. Correct!
Expected 0, got: 0. Correct!

Test planner output time

Here we will test the output time of the implemented search function. The maximum acceptable output time is 60 seconds for the given domains. Please consider that a good implementation should take less than 20 seconds (depending on the machine) for any of the given problems.
In [25]:
#Student_tests
dwr = "examples/dwr/dwr.pddl"
pb1 = "examples/dwr/pb1.pddl"
pb2 = "examples/dwr/pb2.pddl"
planner = HeuristicPlanner()

plan, time = planner.solve_file(dwr, pb1)
print("Elapsed time:", str(time) + (' Passed!' if time <= 60.0 else ' Timeout!'))

plan, time = planner.solve_file(dwr, pb2)
print("Elapsed time:", str(time) + (' Passed!' if time <= 60.0 else ' Timeout!'))
Out [25]:
Elapsed time: 9.877203226089478 Passed!
Elapsed time: 0 Passed!

Benchmark heuristic function (Optional)

We can now compare various planning strategies more objectively than just using runtime.
First, instrument your code with the following line immediately before the return statements of your search procedure (to collect the number of visited states):
if self.stats is not None: self.stats.nodes = len(visited)
Then run the code below to see how your planner compares to the simplest BFS.
from pddl.bfs_planner import BFS_Planner
# from pddl.heuristic_planner import HeuristicPlanner
# from pddl.delete_relaxation_h import MaxHeuristic, AdditiveHeuristic, FastForwardHeuristic
from pddl.benchmark import PlanningBenchmark, InstanceStats

import matplotlib.pyplot as plt

max_problems = 8

benchmark = PlanningBenchmark()

def run_heuristic_benchmark(benchmark, heuristic_class, max_problem=max_problems):
    benchmark.reset_stats()
    for i in range(1,max_problem):
        heuristic = heuristic_class(stats=benchmark.get_instance(domain_name='examples/blocksworld/blocksworld.pddl', problem_name='examples/blocksworld/pb%d.pddl'%i ))
        print("Running {!s} problem {!r}".format(heuristic.__class__.__name__,i))
        planner = HeuristicPlanner(heuristic=heuristic)
        planner.collect_benchmark = True
        planner.solve_file('examples/blocksworld/blocksworld.pddl','examples/blocksworld/pb%d.pddl'%i)
    
    hActions, hTime = benchmark.get_stats('examples/blocksworld/blocksworld.pddl',xaxis='action',stat='time', approach='Heuristic')
    hProblem, hTime = benchmark.get_stats('examples/blocksworld/blocksworld.pddl', xaxis='problem', stat='time', approach='Heuristic')
    hActions, hNodes = benchmark.get_stats('examples/blocksworld/blocksworld.pddl', xaxis='action', stat='nodes',approach='Heuristic')
    hActions, hCalls = benchmark.get_stats('examples/blocksworld/blocksworld.pddl', xaxis='action', stat='h_calls', approach='Heuristic')
    return hProblem, hActions, hTime, hNodes, hCalls


def plot_time(actions, times, labels):
    fig1 = plt.figure()
    ax1 = fig1.add_subplot()
    for i in range(len(actions)):
        ax1.plot(actions[i], times[i], label = labels[i])
    ax1.legend(loc='lower right')
    ax1.set_title("Time Performance")
    ax1.set_xlabel("Problem Actions")
    ax1.set_ylabel("Time (s)")
    ax1.legend(loc=2)
    plt.show()
    fig1.clear()
    plt.close(fig1)
    

def plot_expansions(actions, nodes, labels):
    fig2 = plt.figure()
    ax2 = fig2.add_subplot()
    for i in range(len(actions)):
        ax2.plot(actions[i], nodes[i], label=labels[i])
    ax2.legend(loc='lower right')
    ax2.set_title("Expanded Nodes")
    ax2.set_xlabel("Problem Actions")
    ax2.set_ylabel("Expanded Nodes")
    ax2.legend(loc=2)
    plt.show()
    fig2.clear()
    plt.close(fig2)

def plot_calls(actions, calls, labels):
    fig4 = plt.figure()
    ax4 = fig4.add_subplot()
    for i in range(len(actions)):
        ax4.plot(actions[i], calls[i], label=labels[i])
    ax4.legend(loc='lower right')
    ax4.set_title("Heuristic Function Calls")
    ax4.set_xlabel("Problem Actions")
    ax4.set_ylabel("H Calls")
    ax4.legend(loc=2)
    plt.show()
    fig4.clear()
    plt.close(fig4)

def bar_times(problems, times, labels):
    fig3 = plt.figure()
    ax3 = fig3.add_subplot()

    x = range(len(problems[0]))  # the label locations
    bar_width = .7/len(problems)  # the width of the bars
    bar_start = -.35

    rectses = []
    for i in range(len(problems)):
        rectses.append(ax3.bar([xv + bar_start + bar_width*i for xv in x], times[i], bar_width, label=labels[i]))
        
    # Add some text for labels, title and custom x-axis tick labels, etc.
    ax3.set_xlabel('Problem Name')
    ax3.set_ylabel('Time (s)')
    ax3.set_title('Problem')
    ax3.set_xticks(x)
    ax3.set_xticklabels(problems[0])
    ax3.legend()

    def autolabel(rects):
        """Attach a text label above each bar in *rects*, displaying its height."""
        for rect in rects:
            height = rect.get_height()
            ax3.annotate('{:.4f}'.format(height),
                        xy=(rect.get_x() + rect.get_width() / len(problems), height),
                        xytext=(0, 3),  # 3 points vertical offset
                        textcoords="offset points",
                        ha='center', va='bottom')
    
    for r in rectses:
        autolabel(r)

    fig3.tight_layout()
    plt.show()
    fig3.clear()
    plt.close(fig3)
    

for i in range(1, max_problems):
    planner = BFS_Planner()
    planner.collect_benchmark = True
    planner.solve_file('examples/blocksworld/blocksworld.pddl', 'examples/blocksworld/pb%d.pddl' % i)

# benchmark.plot_stat('../examples/blocksworld/blocksworld.pddl', xaxis='action', stat='time', approach='BFS')
# benchmark.plot_stat('../examples/blocksworld/blocksworld.pddl', xaxis='problem', stat='time', approach='BFS')

hActions = []
hTimes = []
hNodes = []
hCalls = []
hProblems = []
hApproaches = []

hActionsBFS, hTimeBFS = benchmark.get_stats('examples/blocksworld/blocksworld.pddl', xaxis='action', stat='time',
                                      approach='BFS')
hProblemBFS, hTimeBFS = benchmark.get_stats('examples/blocksworld/blocksworld.pddl', xaxis='problem', stat='time',
                                      approach='BFS')
hActionsBFS, hNodesBFS = benchmark.get_stats('examples/blocksworld/blocksworld.pddl', xaxis='action', stat='nodes',
                                       approach='BFS')
hActionsBFS, hCallsBFS = benchmark.get_stats('examples/blocksworld/blocksworld.pddl', xaxis='action',
                                       stat='h_calls', approach='BFS')

# hActions.append(hActionsBFS)
# hTimes.append(hTimeBFS)
# hNodes.append(hNodesBFS)
# hCalls.append(hCallsBFS)
# hProblems.append(hProblemBFS)
# hApproaches.append('BFS')

hProblemMax, hActionsMax, hTimeMax, hNodesMax, hCallsMax = run_heuristic_benchmark(benchmark, MaxHeuristic)
hActions.append(hActionsMax)
hTimes.append(hTimeMax)
hNodes.append(hNodesMax)
hCalls.append(hCallsMax)
hProblems.append(hProblemMax)
hApproaches.append('h_max')

hProblemAdd, hActionsAdd, hTimeAdd, hNodesAdd, hCallsAdd = run_heuristic_benchmark(benchmark, AdditiveHeuristic)
hActions.append(hActionsAdd)
hTimes.append(hTimeAdd)
hNodes.append(hNodesAdd)
hCalls.append(hCallsAdd)
hProblems.append(hProblemAdd)
hApproaches.append('h_add')

hProblemAdd, hActionsAdd, hTimeAdd, hNodesAdd, hCallsAdd = run_heuristic_benchmark(benchmark, FastForwardHeuristic)
hActions.append(hActionsAdd)
hTimes.append(hTimeAdd)
hNodes.append(hNodesAdd)
hCalls.append(hCallsAdd)
hProblems.append(hProblemAdd)
hApproaches.append('h_ff')

plot_time(hActions, hTimes, hApproaches)
plot_expansions(hActions, hNodes, hApproaches)
plot_calls(hActions, hCalls, hApproaches)
bar_times(hProblems, hTimes, hApproaches)

## Plot your additional heuristics here
0%
10%
20%
30%
40%
50%
60%
70%
80%
90%
100%
word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word

mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1