search Module (search.py)


"""Search (Chapters 3-4)

The way to use this code is to subclass Problem to create a class of problems,
then create problem instances and solve them with calls to the various search
functions."""

from __future__ import generators
from utils import *
import math, random, sys, time, bisect, string


class Problem: """The abstract class for a formal problem. You should subclass this and implement the method successor, and possibly __init__, goal_test, and path_cost. Then you will create instances of your subclass and solve them with the various search functions.""" def __init__(self, initial, goal=None): """The constructor specifies the initial state, and possibly a goal state, if there is a unique goal. Your subclass's constructor can add other arguments.""" self.initial = initial; self.goal = goal def successor(self, state): """Given a state, return a sequence of (action, state) pairs reachable from this state. If there are many successors, consider an iterator that yields the successors one at a time, rather than building them all at once. Iterators will work fine within the framework.""" abstract() def goal_test(self, state): """Return True if the state is a goal. The default method compares the state to self.goal, as specified in the constructor. Implement this method if checking against a single self.goal is not enough.""" return state == self.goal def path_cost(self, c, state1, action, state2): """Return the cost of a solution path that arrives at state2 from state1 via action, assuming cost c to get up to state1. If the problem is such that the path doesn't matter, this function will only look at state2. If the path does matter, it will consider c and maybe state1 and action. The default method costs 1 for every step in the path.""" return c + 1
class Node: """A node in a search tree. Contains a pointer to the parent (the node that this is a successor of) and to the actual state for this node. Note that if a state is arrived at by two paths, then there are two nodes with the same state. Also includes the action that got us to this state, and the total path_cost (also known as g) to reach the node. Other functions may add an f and h value; see best_first_graph_search and astar_search for an explanation of how the f and h values are handled. You will not need to subclass this class.""" def __init__(self, state, parent=None, action=None, path_cost=0): "Create a search treee Node, derived from a parent by an action." self.state, self.parent, self.action, self.path_cost, = ( state, parent, action, path_cost) self.depth = if_(parent, parent.depth+1, 0) def __repr__(self): return "<Node %s>" % self.state def path(self): "Create a list of nodes from the root to this node." x, result = self, [self] while x.parent: result.append(x.parent) x = x.parent return result def expand(self, problem): "Return a list of nodes reachable from this node. [Fig. 3.8]" return [Node(next, self, act, problem.path_cost(self.path_cost, self.state, act, next)) for (act, next) in problem.successor(self.state)]
## Uninformed Search algorithms def tree_search(problem, fringe): """Search through the successors of a problem to find a goal. The argument fringe should be an empty queue. Don't worry about repeated paths to a state. [Fig. 3.8]""" fringe.append(Node(problem.initial)) while fringe: node = fringe.pop() if problem.goal_test(node.state): return node fringe.extend(node.expand(problem)) return None def breadth_first_tree_search(problem): "Search the shallowest nodes in the search tree first. [p 74]" return tree_search(problem, FIFOQueue()) def depth_first_tree_search(problem): "Search the deepest nodes in the search tree first. [p 74]" return tree_search(problem, Stack()) def graph_search(problem, fringe): """Search through the successors of a problem to find a goal. The argument fringe should be an empty queue. If two paths reach a state, only use the best one. [Fig. 3.18]""" closed = {} fringe.append(Node(problem.initial)) while fringe: node = fringe.pop() if problem.goal_test(node.state): return node if node.state not in closed: closed[node.state] = True fringe.extend(node.expand(problem)) return None def breadth_first_graph_search(problem): "Search the shallowest nodes in the search tree first. [p 74]" return graph_search(problem, FIFOQueue()) def depth_first_graph_search(problem): "Search the deepest nodes in the search tree first. [p 74]" return graph_search(problem, Stack()) def depth_limited_search(problem, limit=1000): "[Fig. 3.12]" def recursive_dls(node, problem, limit): cutoff_occurred = False if problem.goal_test(node.state): return node elif node.depth == limit: return 'cutoff' else: for successor in node.expand(problem): result = recursive_dls(successor, problem, limit) if result == 'cutoff': cutoff_occurred = True elif result != None: return result if cutoff_occurred: return 'cutoff' else: return None # Body of depth_limited_search: return recursive_dls(Node(problem.initial), problem, limit) def iterative_deepening_search(problem): "[Fig. 3.13]" for depth in xrange(sys.maxint): result = depth_limited_search(problem, depth) if result is not 'cutoff': return result
# Informed (Heuristic) Search def best_first_graph_search(problem, f): """Search the nodes with the lowest f scores first. You specify the function f(node) that you want to minimize; for example, if f is a heuristic estimate to the goal, then we have greedy best first search; if f is node.depth then we have depth-first search. There is a subtlety: the line "f = memoize(f, 'f')" means that the f values will be cached on the nodes as they are computed. So after doing a best first search you can examine the f values of the path returned.""" f = memoize(f, 'f') return graph_search(problem, PriorityQueue(min, f)) greedy_best_first_graph_search = best_first_graph_search # Greedy best-first search is accomplished by specifying f(n) = h(n). def astar_search(problem, h=None): """A* search is best-first graph search with f(n) = g(n)+h(n). You need to specify the h function when you call astar_search. Uses the pathmax trick: f(n) = max(f(n), g(n)+h(n)).""" h = h or problem.h def f(n): return max(getattr(n, 'f', -infinity), n.path_cost + h(n)) return best_first_graph_search(problem, f)
## Other search algorithms def recursive_best_first_search(problem): "[Fig. 4.1]" def RBFS(problem, node, flimit): if problem.goal_test(node.state): return node successors = expand(node, problem) if len(successors) == 0: return None, infinity for s in successors: s.f = max(s.path_cost + s.h, node.f) while True: successors.sort(compare(lambda x: x.f)) # Order by lowest f value best = successors[0] if best.f > flimit: return None, best.f alternative = successors[1] result, best.f = RBFS(problem, best, min(flimit, alternative)) if result is not None: return result return RBFS(Node(problem.initial), infinity) def hill_climbing(problem): "[Fig. 4.4]" current = Node(problem.initial) while True: neighbor = argmin(expand(node, problem), lambda x: x.path_cost) if neighbor.path_cost > current.path_cost: return current.state current = neighbor def exp_schedule(k=20, lam=0.005, limit=100): "One possible schedule function for simulated annealing" return lambda t: if_(t < limit, k * math.exp(-lam * t), 0) def simulated_annealing(problem, schedule=exp_schedule()): "[Fig. 4.5]" current = Node(problem.initial) for t in xrange(sys.maxint): T = schedule(t) if T == 0: return current next = random.choice(expand(node. problem)) delta_e = next.path_cost - current.path_cost if delta_e > 0 or probability(math.exp(delta_e/T)): current = next def online_dfs_agent(a): "[Fig. 4.12]" pass #### more def lrta_star_agent(a): "[Fig. 4.12]" pass #### more
# Genetic Algorithm def genetic_search(problem, fitness_fn, ngen=1000, pmut=0.0, n=20): """Call genetic_algorithm on the appropriate parts of a problem. This requires that the problem has a successor function that generates reasonable states, and that it has a path_cost function that scores states. We use the negative of the path_cost function, because costs are to be minimized, while genetic-algorithm expects a fitness_fn to be maximized.""" states = [s for (a, s) in problem.successor(problem.initial_state)[:n]] random.shuffle(states) fitness_fn = lambda s: - problem.path_cost(0, s, None, s) return genetic_algorithm(states, fitness_fn, ngen, pmut) def genetic_algorithm(population, fitness_fn, ngen=1000, pmut=0.0): """[Fig. 4.7]""" def reproduce(p1, p2): c = random.randrange(len(p1)) return p1[:c] + p2[c:] for i in range(ngen): new_population = [] for i in len(population): p1, p2 = random_weighted_selections(population, 2, fitness_fn) child = reproduce(p1, p2) if random.uniform(0,1) > pmut: child.mutate() new_population.append(child) population = new_population return argmax(population, fitness_fn) def random_weighted_selection(seq, n, weight_fn): """Pick n elements of seq, weighted according to weight_fn. That is, apply weight_fn to each element of seq, add up the total. Then choose an element e with probability weight[e]/total. Repeat n times, with replacement. Ex: random_weighted_selection(range(10), 3, lambda x: x * x)""" totals = []; runningtotal = 0 for item in seq: runningtotal += weight_fn(item) totals.append(runningtotal) print totals selections = [] for s in range(n): r = random.uniform(0, totals[-1]) for i in range(len(seq)): if totals[i] > r: selections.append(seq[i]) break return selections
# The remainder of this file implements examples for the search algorithms.
# Graphs class Graph: """A graph connects nodes (verticies) by edges (links). Each edge can also have a length associated with it. The constructor call is something like: g = Graph({'A': {'B': 1, 'C': 2}) this makes a graph with 3 nodes, A, B, and C, with an edge of length 1 from A to B, and an edge of length 2 from A to C. You can also do: g = Graph({'A': {'B': 1, 'C': 2}, directed=False) This makes an undirected graph, so inverse links are also added. The graph stays undirected; if you add more links with g.connect('B', 'C', 3), then inverse link is also added. You can use g.nodes() to get a list of nodes, g.get('A') to get a dict of links out of A, and g.get('A', 'B') to get the length of the link from A to B. 'Lengths' can actually be any object at all, and nodes can be any hashable object.""" def __init__(self, dict=None, directed=True): self.dict = dict or {} self.directed = directed if not directed: self.make_undirected() def make_undirected(self): "Make a digraph into an undirected graph by adding symmetric edges." for a in self.dict.keys(): for (b, distance) in self.dict[a].items(): self.connect1(b, a, distance) def connect(self, A, B, distance=1): """Add a link from A and B of given distance, and also add the inverse link if the graph is undirected.""" self.connect1(A, B, distance) if not self.directed: self.connect1(B, A, distance) def connect1(self, A, B, distance): "Add a link from A to B of given distance, in one direction only." self.dict.setdefault(A,{})[B] = distance def get(self, a, b=None): """Return a link distance or a dict of {node: distance} entries. .get(a,b) returns the distance or None; .get(a) returns a dict of {node: distance} entries, possibly {}.""" links = self.dict.setdefault(a, {}) if b is None: return links else: return links.get(b) def nodes(self): "Return a list of nodes in the graph." return self.dict.keys() def UndirectedGraph(dict=None): "Build a Graph where every edge (including future ones) goes both ways." return Graph(dict=dict, directed=False) def RandomGraph(nodes=range(10), min_links=2, width=400, height=300, curvature=lambda: random.uniform(1.1, 1.5)): """Construct a random graph, with the specified nodes, and random links. The nodes are laid out randomly on a (width x height) rectangle. Then each node is connected to the min_links nearest neighbors. Because inverse links are added, some nodes will have more connections. The distance between nodes is the hypotenuse times curvature(), where curvature() defaults to a random number between 1.1 and 1.5.""" g = UndirectedGraph() g.locations = {} ## Build the cities for node in nodes: g.locations[node] = (random.randrange(width), random.randrange(height)) ## Build roads from each city to at least min_links nearest neighbors. for i in range(min_links): for node in nodes: if len(g.get(node)) < min_links: here = g.locations[node] def distance_to_node(n): if n is node or g.get(node,n): return infinity return distance(g.locations[n], here) neighbor = argmin(nodes, distance_to_node) d = distance(g.locations[neighbor], here) * curvature() g.connect(node, neighbor, int(d)) return g romania = UndirectedGraph(Dict( A=Dict(Z=75, S=140, T=118), B=Dict(U=85, P=101, G=90, F=211), C=Dict(D=120, R=146, P=138), D=Dict(M=75), E=Dict(H=86), F=Dict(S=99), H=Dict(U=98), I=Dict(V=92, N=87), L=Dict(T=111, M=70), O=Dict(Z=71, S=151), P=Dict(R=97), R=Dict(S=80), U=Dict(V=142))) romania.locations = Dict( A=( 91, 492), B=(400, 327), C=(253, 288), D=(165, 299), E=(562, 293), F=(305, 449), G=(375, 270), H=(534, 350), I=(473, 506), L=(165, 379), M=(168, 339), N=(406, 537), O=(131, 571), P=(320, 368), R=(233, 410), S=(207, 457), T=( 94, 410), U=(456, 350), V=(509, 444), Z=(108, 531)) australia = UndirectedGraph(Dict( T=Dict(), SA=Dict(WA=1, NT=1, Q=1, NSW=1, V=1), NT=Dict(WA=1, Q=1), NSW=Dict(Q=1, V=1))) class GraphProblem(Problem): "The problem of searching a graph from one node to another." def __init__(self, initial, goal, graph): Problem.__init__(self, initial, goal) self.graph = graph def succ(self, A): "Return a list of (action, result) pairs." return [(B, B) for B in self.graph.get(A).keys()] def path_cost(self, cost_so_far, A, action, B): return cost_so_far + (self.graph.get(A,B) or infinity) def h(self, node): "h function is straight-line distance from a node's state to goal." locs = getattr(self.graph, 'locations') if locs: return int(distance(locs[node.state], locs[self.goal])) else: return infinity
class InstrumentedProblem(Problem): """Delegates to a problem, and keeps statistics.""" def __init__(self, problem): self.problem = problem self.succs = self.goal_tests = self.states = 0 self.found = None def succ(self, state): "Return a list of (action, state) pairs reachable from this state." result = self.problem.succ(state) self.succs += 1; self.states += len(result) return result def goal_test(self, state): "Return true if the state is a goal. By default, compare to self.goal." self.goal_tests += 1 result = self.problem.goal_test(state) if result: self.found = state return result def __getattr__(self, attr): if attr in ('succs', 'goal_tests', 'states'): return self.__dict__[attr] else: return getattr(self.problem, attr) def __repr__(self): return '<%3d/%3d/%3d/%s>' % (self.succs, self.goal_tests, self.states, str(self.found)[0:4]) def compare(problems=[GraphProblem('A', 'B', romania), GraphProblem('O', 'N', romania)], header=['', 'Romania(A,B)', 'Romania(O, N)'], searchers=[breadth_first_tree_search, #depth_first_tree_search, breadth_first_graph_search, #depth_first_graph_search iterative_deepening_search, astar_search ]): def do(searcher, problem): p = InstrumentedProblem(problem) searcher(p) return p table = [[name(s)] + [do(s, p) for p in problems] for s in searchers] print_table(table, header)



# Copyright: Peter Norvig, 2002.
# AIMA: Python Code, Example Output.
# Python.org: Tutorial, Language Ref, Libraries.