
Priority Queue(优先队列)是一种根据优先级对集合元素排序的数据结构,根据定义可以分为最大优先队列和最小优先队列.与之对应的有最大堆和最小堆.应用在排序算法上就是堆排序.在python中heapq包就是最小堆的具体实现,可以使用heapq.heappush()和heapq.heappop()来对堆进行操作,如果要将一个已有数组堆化,可以调用heapq.heapify()函数.在普林斯顿的数据结构与算法课程上,使用最小优先队列来解决了经典的8-Puzzle问题(就是3*3的拼图问题).下面是其具体实现,输入一个打乱顺序的拼图,可以输出求解步骤或者无法求解.(注意返回的字符串是空格子(0)的移动方向,与具体的拼图操作恰好相反).略加修改,可以实现任意规格的puzzle求解(下面包含一个4*4的board和一个3*4的board)
思路是:对于初始board,同时使用2个最小优先队列来求解.据说交换同一行2个格子得到一个新的board,与交换之前的board总只有一个可以求解.在循环过程中,不停的求当前board的neighbors(也就是空格子上下左右移动一格之后的board),然后用这些board的hannming distance+Manhattan distance+previous move作为排序键值插入最小堆.每次都取最小堆的堆顶元素作为下次search的起点.这样就可以保证永远在朝着目标前进.直到2个最小队列的堆顶其中一个被求解.然后回溯回去获得求解route.
class Board(object):
def __init__(self,mat):
self.m = len(mat)
self.n = len(mat[0])
self.blocks = [[0]*self.n for row in range(self.m)]
self.dist = 0
self.hNum = 0 # hamming distance
for i in range(self.m):
for j in range(self.n):
self.blocks[i][j] = mat[i][j]
if mat[i][j]==0:
self.row0 = i
self.col0 = j
elif self.blocks[i][j]!=i*self.n+j+1:
self.hNum += 1 # hamming distance
r,c = divmod(self.blocks[i][j]-1,self.n)
self.dist += (abs(i-r)+abs(j-c)) # manhanttan distance
def get_dist(self):
# get manhattan distance
return self.dist
def is_goal(self):
# check if it's goal position, hamming distance
return self.hNum==0
def get_zero(self):
# get zeros postion
return (self.row0,self.col0)
def twin(self):
# get twin board by swap 2 in a row
row1, col1 = -1,-1
tt = [[0]*self.n for row in range(self.m)]
for i in range(self.m):
for j in range(self.n):
tt[i][j] = self.blocks[i][j]
if row1==1 and j<self.n-1 and self.blocks[i][j]!=0 and \
self.blocks[i][j+1]!=0:
row1,col1 = i,j
# swap
tt[row1][col1],tt[row1][col1+1] = tt[row1][col1+1],tt[row1][col1]
return Board(tt)
def equals(self,y):
# check equals
return sum(self.blocks,[])==sum(y.blocks,[])
def neighbors(self):
# get all neighbors by swap 1 piece
nbs = []
if self.row0>0: # move down
self.blocks[self.row0][self.col0] = self.blocks[self.row0-1][self.col0]
self.blocks[self.row0-1][self.col0] = 0
nbs.append(Board(self.blocks))
self.blocks[self.row0-1][self.col0] = self.blocks[self.row0][self.col0]
self.blocks[self.row0][self.col0] = 0
if self.row0<self.m-1: # move up
self.blocks[self.row0][self.col0] = self.blocks[self.row0+1][self.col0]
self.blocks[self.row0+1][self.col0] = 0
nbs.append(Board(self.blocks))
self.blocks[self.row0+1][self.col0] = self.blocks[self.row0][self.col0]
self.blocks[self.row0][self.col0] = 0
if self.col0>0: # move right
self.blocks[self.row0][self.col0] = self.blocks[self.row0][self.col0-1]
self.blocks[self.row0][self.col0-1] = 0
nbs.append(Board(self.blocks))
self.blocks[self.row0][self.col0-1] = self.blocks[self.row0][self.col0]
self.blocks[self.row0][self.col0] = 0
if self.col0<self.n-1: # move left
self.blocks[self.row0][self.col0] = self.blocks[self.row0][self.col0+1]
self.blocks[self.row0][self.col0+1] = 0
nbs.append(Board(self.blocks))
self.blocks[self.row0][self.col0+1] = self.blocks[self.row0][self.col0]
self.blocks[self.row0][self.col0] = 0
return nbs
def __repr__(self):
return '\n'.join([' '.join(['{0:2d}'.format(col) for col in row]) for row in self.blocks])
class SearchNode(object):
def __init__(self,b,m=0,p=None):
self.node = b
self.move = m
self.prev = p
def dist(self): # 2 distances + previous_moves
return self.node.get_dist()+self.move+self.node.hNum
def __repr__(self):
return str(self.node)
import heapq # min pq
class Solver(object):
# min priority queue solution
def __init__(self,b):
self.solvable = False
if b.is_goal(): self.solvable = True
self.route = []
initStep = []
a = SearchNode(b)
heapq.heappush(initStep,(a.dist(),a))
swapStep = []
swapa = SearchNode(b.twin())
heapq.heappush(swapStep,(swapa.dist(),swapa))
# if initial is not goal, try to solve it, run 2 MinPQ at the same time
if not self.solvable:
while len(initStep)>0:
s1 = heapq.heappop(initStep)[1]
s2 = heapq.heappop(swapStep)[1]
# solved ?
if s1.node.is_goal() or s2.node.is_goal():
# original
if s1.node.is_goal():
self.solvable = True
# set the route
while s1:
self.route.append(s1.node)
s1 = s1.prev
#self.route.append(b)
else:
self.solvable = False
break
else:
for b in s1.node.neighbors():
if s1.prev==None or not b.equals(s1.prev.node):
c = SearchNode(b,s1.move+1,s1)
heapq.heappush(initStep,(c.dist(),c))
for b in s2.node.neighbors():
if s2.prev==None or not b.equals(s2.prev.node):
c = SearchNode(b,s2.move+1,s2)
heapq.heappush(swapStep,(c.dist(),c))
def is_solvable(self):
return self.solvable
def moves(self):
if self.is_solvable():
if len(self.route)==0: return 0
else: return len(self.route)-1
else:
return -1
def solution(self):
if self.is_solvable():
return self.route
else:
return None
def checkio(puzzle):
"""
Solve the puzzle
U - up
D - down
L - left
R - right
"""
a = Board(puzzle)
b = Solver(a)
r = []
if b.is_solvable():
m = len(b.solution())
if m==0:
print 'Original is goal\n'
return ''
else:
for i in range(m-1):
print b.solution()[m-1-i]
r1,c1 = b.solution()[i].get_zero() # current
r2,c2 = b.solution()[i+1].get_zero() # next
print
if r1<r2: # up
r.append('U')
elif r1>r2: # down
r.append('D')
elif c1<c2: # left
r.append('L')
elif c1>c2: # right
r.append('R')
print b.solution()[0]
print
return ''.join(r[::-1])
else:
print 'Canot be solved\n'
return ''
if __name__ == '__main__':
print(checkio([[1, 2, 3],
[4, 6, 8],
[7, 5, 0]]))
print
print(checkio([[1, 2, 3],
[4, 6, 8],
[5, 7, 0]]))
print
print checkio([[1, 2, 3, 4],
[5, 6, 7, 8],
[9, 10, 11, 0],
[13, 14, 15, 12]])
print
print checkio([[1, 3, 0],
[4, 2, 6],
[7, 5, 8],
[10, 11, 9]])结果:
1 2 3 4 6 8 7 5 0 1 2 3 4 6 0 7 5 8 1 2 3 4 0 6 7 5 8 1 2 3 4 5 6 7 0 8 1 2 3 4 5 6 7 8 0 ULDR Canot be solved 1 2 3 4 5 6 7 8 9 10 11 0 13 14 15 12 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 D 1 3 0 4 2 6 7 5 8 10 11 9 1 0 3 4 2 6 7 5 8 10 11 9 1 2 3 4 0 6 7 5 8 10 11 9 1 2 3 4 5 6 7 0 8 10 11 9 1 2 3 4 5 6 7 8 0 10 11 9 1 2 3 4 5 6 7 8 9 10 11 0 LDDRD
No comments :
Post a Comment