Priority Queue(优先队列)是一种根据优先级对集合元素排序的数据结构,根据定义可以分为最大优先队列和最小优先队列.与之对应的有最大堆和最小堆.应用在排序算法上就是堆排序.在python中heapq包就是最小堆的具体实现,可以使用heapq.heappush()和heapq.heappop()来对堆进行操作,如果要将一个已有数组堆化,可以调用heapq.heapify()函数.在普林斯顿的数据结构与算法课程上,使用最小优先队列来解决了经典的8-Puzzle问题(就是3*3的拼图问题).下面是其具体实现,输入一个打乱顺序的拼图,可以输出求解步骤或者无法求解.(注意返回的字符串是空格子(0)的移动方向,与具体的拼图操作恰好相反).略加修改,可以实现任意规格的puzzle求解(下面包含一个4*4的board和一个3*4的board)
思路是:对于初始board,同时使用2个最小优先队列来求解.据说交换同一行2个格子得到一个新的board,与交换之前的board总只有一个可以求解.在循环过程中,不停的求当前board的neighbors(也就是空格子上下左右移动一格之后的board),然后用这些board的hannming distance+Manhattan distance+previous move作为排序键值插入最小堆.每次都取最小堆的堆顶元素作为下次search的起点.这样就可以保证永远在朝着目标前进.直到2个最小队列的堆顶其中一个被求解.然后回溯回去获得求解route.
class Board(object): def __init__(self,mat): self.m = len(mat) self.n = len(mat[0]) self.blocks = [[0]*self.n for row in range(self.m)] self.dist = 0 self.hNum = 0 # hamming distance for i in range(self.m): for j in range(self.n): self.blocks[i][j] = mat[i][j] if mat[i][j]==0: self.row0 = i self.col0 = j elif self.blocks[i][j]!=i*self.n+j+1: self.hNum += 1 # hamming distance r,c = divmod(self.blocks[i][j]-1,self.n) self.dist += (abs(i-r)+abs(j-c)) # manhanttan distance def get_dist(self): # get manhattan distance return self.dist def is_goal(self): # check if it's goal position, hamming distance return self.hNum==0 def get_zero(self): # get zeros postion return (self.row0,self.col0) def twin(self): # get twin board by swap 2 in a row row1, col1 = -1,-1 tt = [[0]*self.n for row in range(self.m)] for i in range(self.m): for j in range(self.n): tt[i][j] = self.blocks[i][j] if row1==1 and j<self.n-1 and self.blocks[i][j]!=0 and \ self.blocks[i][j+1]!=0: row1,col1 = i,j # swap tt[row1][col1],tt[row1][col1+1] = tt[row1][col1+1],tt[row1][col1] return Board(tt) def equals(self,y): # check equals return sum(self.blocks,[])==sum(y.blocks,[]) def neighbors(self): # get all neighbors by swap 1 piece nbs = [] if self.row0>0: # move down self.blocks[self.row0][self.col0] = self.blocks[self.row0-1][self.col0] self.blocks[self.row0-1][self.col0] = 0 nbs.append(Board(self.blocks)) self.blocks[self.row0-1][self.col0] = self.blocks[self.row0][self.col0] self.blocks[self.row0][self.col0] = 0 if self.row0<self.m-1: # move up self.blocks[self.row0][self.col0] = self.blocks[self.row0+1][self.col0] self.blocks[self.row0+1][self.col0] = 0 nbs.append(Board(self.blocks)) self.blocks[self.row0+1][self.col0] = self.blocks[self.row0][self.col0] self.blocks[self.row0][self.col0] = 0 if self.col0>0: # move right self.blocks[self.row0][self.col0] = self.blocks[self.row0][self.col0-1] self.blocks[self.row0][self.col0-1] = 0 nbs.append(Board(self.blocks)) self.blocks[self.row0][self.col0-1] = self.blocks[self.row0][self.col0] self.blocks[self.row0][self.col0] = 0 if self.col0<self.n-1: # move left self.blocks[self.row0][self.col0] = self.blocks[self.row0][self.col0+1] self.blocks[self.row0][self.col0+1] = 0 nbs.append(Board(self.blocks)) self.blocks[self.row0][self.col0+1] = self.blocks[self.row0][self.col0] self.blocks[self.row0][self.col0] = 0 return nbs def __repr__(self): return '\n'.join([' '.join(['{0:2d}'.format(col) for col in row]) for row in self.blocks]) class SearchNode(object): def __init__(self,b,m=0,p=None): self.node = b self.move = m self.prev = p def dist(self): # 2 distances + previous_moves return self.node.get_dist()+self.move+self.node.hNum def __repr__(self): return str(self.node) import heapq # min pq class Solver(object): # min priority queue solution def __init__(self,b): self.solvable = False if b.is_goal(): self.solvable = True self.route = [] initStep = [] a = SearchNode(b) heapq.heappush(initStep,(a.dist(),a)) swapStep = [] swapa = SearchNode(b.twin()) heapq.heappush(swapStep,(swapa.dist(),swapa)) # if initial is not goal, try to solve it, run 2 MinPQ at the same time if not self.solvable: while len(initStep)>0: s1 = heapq.heappop(initStep)[1] s2 = heapq.heappop(swapStep)[1] # solved ? if s1.node.is_goal() or s2.node.is_goal(): # original if s1.node.is_goal(): self.solvable = True # set the route while s1: self.route.append(s1.node) s1 = s1.prev #self.route.append(b) else: self.solvable = False break else: for b in s1.node.neighbors(): if s1.prev==None or not b.equals(s1.prev.node): c = SearchNode(b,s1.move+1,s1) heapq.heappush(initStep,(c.dist(),c)) for b in s2.node.neighbors(): if s2.prev==None or not b.equals(s2.prev.node): c = SearchNode(b,s2.move+1,s2) heapq.heappush(swapStep,(c.dist(),c)) def is_solvable(self): return self.solvable def moves(self): if self.is_solvable(): if len(self.route)==0: return 0 else: return len(self.route)-1 else: return -1 def solution(self): if self.is_solvable(): return self.route else: return None def checkio(puzzle): """ Solve the puzzle U - up D - down L - left R - right """ a = Board(puzzle) b = Solver(a) r = [] if b.is_solvable(): m = len(b.solution()) if m==0: print 'Original is goal\n' return '' else: for i in range(m-1): print b.solution()[m-1-i] r1,c1 = b.solution()[i].get_zero() # current r2,c2 = b.solution()[i+1].get_zero() # next print if r1<r2: # up r.append('U') elif r1>r2: # down r.append('D') elif c1<c2: # left r.append('L') elif c1>c2: # right r.append('R') print b.solution()[0] print return ''.join(r[::-1]) else: print 'Canot be solved\n' return '' if __name__ == '__main__': print(checkio([[1, 2, 3], [4, 6, 8], [7, 5, 0]])) print print(checkio([[1, 2, 3], [4, 6, 8], [5, 7, 0]])) print print checkio([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 0], [13, 14, 15, 12]]) print print checkio([[1, 3, 0], [4, 2, 6], [7, 5, 8], [10, 11, 9]])
结果:
1 2 3 4 6 8 7 5 0 1 2 3 4 6 0 7 5 8 1 2 3 4 0 6 7 5 8 1 2 3 4 5 6 7 0 8 1 2 3 4 5 6 7 8 0 ULDR Canot be solved 1 2 3 4 5 6 7 8 9 10 11 0 13 14 15 12 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 D 1 3 0 4 2 6 7 5 8 10 11 9 1 0 3 4 2 6 7 5 8 10 11 9 1 2 3 4 0 6 7 5 8 10 11 9 1 2 3 4 5 6 7 0 8 10 11 9 1 2 3 4 5 6 7 8 0 10 11 9 1 2 3 4 5 6 7 8 9 10 11 0 LDDRD
No comments :
Post a Comment