搜索算法进阶:从原理到工程实现 搜索算法概述 搜索问题定义 搜索问题 = 状态空间 + 初始状态 + 目标状态 + 操作符 + 代价函数 启发式搜索 A 算法 IDA 算法(迭代加深) 启发式搜索 贪婪最佳优先搜索(GBFS) A vs GBFS A: 使用启发式函数 保证最优解 需要额外内存存储 fscore GBFS: 不使用启发式 不保证最优解 内存消耗小 本地搜索 模拟退火 约束满足 约束满足问题(CSP) 最短路径(Dijkstra) 元启发式搜索 遗传算法 实际应用 路径规划 游戏AI 推荐系统 总结 搜索算法要点: 启发式:A、IDA 保证最优解 贪婪:快速但不一定最优 随机优化:模拟退火、遗传算法 约束满足:回溯、分支限界 实际应用:路径规划、游戏AI、推荐系统
搜索问题 = 状态空间 + 初始状态 + 目标状态 + 操作符 + 代价函数
# 状态空间 state_space = { 'A': ['B', 'C'], 'B': ['A', 'C'], 'C': ['A', 'B'], 'Goal': 'C' } # 操作和代价 actions = { 'A->B': {'cost': 5}, 'A->C': {'cost': 15}, 'B->A': {'cost': 5}, 'B->C': {'cost': 10}, 'C->A': {'cost': 20}, 'C->B': {'cost": 10} }
import heapq class AStar: def __init__(self, heuristic): self.heuristic = heuristic def search(self, start, goal, graph): open_set = set() came_from = {} g_score = {} f_score = {} g_score[start] = 0 f_score[start] = self.heuristic(start, goal) open_set.add(start) while open_set: current = min(open_set, key=lambda x: f_score[x]) if current == goal: return self.reconstruct_path(came_from, current) open_set.remove(current) for neighbor, cost in graph[current].items(): tentative_g_score = g_score[current] + cost tentative_f_score = tentative_g_score + self.heuristic(neighbor, goal) if neighbor not in g_score or tentative_g_score < g_score[neighbor]: came_from[neighbor] = current g_score[neighbor] = tentative_g_score f_score[neighbor] = tentative_f_score open_set.add(neighbor) return None def heuristic(self, a, b): # 曼哈顿距离 return abs(ord(a) - ord(b))
class IDAStar: def __init__(self, heuristic): self.heuristic = heuristic self.threshold = 0.01 def search(self, start, goal, graph): bound = {} open_set = {start} closed_set = set() while open_set: current = min(open_set, key=lambda x: bound.get(x, float('inf'))) if current == goal: return self.reconstruct_path(closed_set, current) open_set.remove(current) closed_set.add(current) for neighbor, cost in graph[current].items(): if neighbor not in closed_set: tentative_g = bound[current] + cost + self.heuristic(neighbor, goal) if neighbor not in bound or tentative_g < bound.get(neighbor, float('inf')): bound[neighbor] = tentative_g open_set.add(neighbor) return None
from collections import deque class GBFS: def __init__(self): self.frontier = [] def search(self, start, goal, graph): visited = set() queue = deque([(start, [])]) while queue: node, path = queue.popleft() if node == goal: return path + [node] if node not in visited: visited.add(node) for neighbor, cost in sorted(graph[node].items(), key=lambda x: x[1]): queue.append((neighbor[0], path + [node, neighbor[0]])) return None
A*:
GBFS:
import random import math class SimulatedAnnealing: def __init__(self, initial_temp, cooling_rate=0.95, min_temp=0.01): self.initial_temp = initial_temp self.cooling_rate = cooling_rate self.min_temp = min_temp def search(self, start, graph, max_iterations=1000): current = start current_cost = self.cost(current, graph) best = current best_cost = current_cost temp = self.initial_temp for i in range(max_iterations): temp *= self.cooling_rate if temp < self.min_temp: break # 生成邻居 neighbors = self.get_neighbors(current, graph) # 随机选择邻居 neighbor = random.choice(neighbors) neighbor_cost = self.cost(neighbor, graph) # 接受准则 delta = neighbor_cost - current_cost if delta < 0 or math.exp(-delta / temp) > random.random(): current = neighbor current_cost = neighbor_cost if current_cost < best_cost: best = current best_cost = current_cost return best def get_neighbors(self, node, graph): return graph.get(node, {}) def cost(self, path, graph): return sum(self.edge_cost(path[i], path[i+1], graph) for i in range(len(path)-1)) def edge_cost(self, from_node, to_node, graph): return graph[from_node][to_node]
class CSPSolver: def __init__(self): self.domains = {} self.constraints = [] def solve(self, domains, constraints): # 回溯法 return self.backtrack({}, domains, constraints) def backtrack(self, assignment, domains, constraints): if not domains: return assignment var = next(iter(domains)) values = domains[var] for value in values: new_assignment = {**assignment, var: value} if self.is_consistent(new_assignment, constraints): # 继续递归 remaining = {k: v for k, v in domains.items() if k != var} result = self.backtrack(new_assignment, remaining, constraints) if result: return result return None def is_consistent(self, assignment, constraints): for constraint in constraints: if not self.satisfies(assignment, constraint): return False return True
import heapq class Dijkstra: def __init__(self): self.distances = {} self.previous = {} def search(self, start, goal, graph): self.distances[start] = 0 pq = [(0, start)] visited = set() while pq: current_dist, current = heapq.heappop(pq) if current in visited: continue visited.add(current) if current == goal: return self.reconstruct_path(self.previous, goal) current_dist = self.distances[current] for neighbor, cost in graph[current].items(): distance = current_dist + cost if neighbor not in self.distances or distance < self.distances[neighbor]: self.distances[neighbor] = distance self.previous[neighbor] = current heapq.heappush(pq, (distance, neighbor)) return None def reconstruct_path(self, previous, goal): path = [] current = goal while current in previous: path.append(current) current = previous[current] path.reverse() return path
import random class GeneticAlgorithm: def __init__(self, pop_size=50, mutation_rate=0.01, crossover_rate=0.8): self.pop_size = pop_size self.mutation_rate = mutation_rate self.crossover_rate = crossover_rate def optimize(self, fitness_function, genome_length): # 初始化种群 population = self.initialize_population(pop_size, genome_length) for generation in range(100): # 评估适应度 scores = [fitness_function(individual) for individual in population] # 选择 selected = self.select(population, scores) # 交叉 offspring = self.crossover(selected, self.crossover_rate) # 变异 offspring = self.mutate(offspring, self.mutation_rate) # 更新种群 population = offspring # 精英保留 elite = self.select_elite(population, scores) population = elite + population[:len(population) - len(elite)] return max(population, key=fitness_function) def initialize_population(self, size, length): return [ ''.join(random.choice(['0', '1']) for _ in range(length)) for _ in range(size) ] def crossover(self, parents, rate): offspring = [] for i in range(0, len(parents), 2): parent1, parent2 = parents[i], parents[i+1] if random.random() < rate: crossover_point = random.randint(1, len(parent1) - 1) child1 = parent1[:crossover_point] + parent2[crossover_point:] child2 = parent2[:crossover_point] + parent1[crossover_point:] offspring.extend([child1, child2]) else: offspring.extend([parent1, parent2]) return offspring def mutate(self, population, rate): mutated = [] for individual in population: if random.random() < rate: # 随机变异一个位置 pos = random.randint(0, len(individual) - 1) individual = individual[:pos] + ('1' if individual[pos] == '0' else '0') + individual[pos+1:] # 多点变异 if random.random() < 0.1: pos2 = random.randint(0, len(individual) - 1) individual = individual[:pos2] + ('1' if individual[pos2] == '0' else '0') + individual[pos2+1:] mutated.append(individual) else: mutated.append(individual) return mutated
class PathPlanner: def __init__(self, map_data): self.graph = self.build_graph(map_data) def plan(self, start, goal): """规划从起点到终点的路径""" return self.a_star_search(start, goal, self.graph) def build_graph(self, map_data): """从地图数据构建图""" graph = {} for road in map_data['roads']: from_node = road['from'] to_node = road['to'] cost = road['distance'] if from_node not in graph: graph[from_node] = {} graph[from_node][to_node] = cost return graph
class GameAI: def __init__(self, game_state): self.state = game_state self.q_table = {} # Q-learning 表 def choose_action(self, state): """选择最优动作""" # ε-greedy 策略 if random.random() < 0.1: return random.choice(self.get_valid_actions(state)) else: return self.get_best_action(state) def update_q_value(self, state, action, reward): """更新 Q 值""" if state not in self.q_table: self.q_table[state] = {} current_q = self.q_table[state].get(action, 0) self.q_table[state][action] = current_q + 0.1 * (reward - current_q)
class RecommenderSystem: def __init__(self, user_item_matrix, item_features): self.user_item_matrix = user_item_matrix self.item_features = item_features def recommend(self, user_id, top_n=10): """为用户推荐物品""" # 协同过滤 similar_users = self.find_similar_users(user_id, top_k=20) # 获取相似用户的物品 recommended_items = {} for user in similar_users: items = self.get_user_items(user) for item, score in items.items(): recommended_items[item] = recommended_items.get(item, 0) + score # 排序并返回 sorted_items = sorted(recommended_items.items(), key=lambda x: x[1], reverse=True) return [item[0] for item in sorted_items[:top_n]]
搜索算法要点:
掌握搜索算法,解决复杂优化问题!