Mercurial > hg > pymctf
view pymctf.py @ 5:b235e08ebd04
cleaner recursion, eliminate parameters
author | Peter Meerwald <pmeerw@cosy.sbg.ac.at> |
---|---|
date | Tue, 18 Dec 2007 15:16:56 +0100 |
parents | 4fc1d403ad14 |
children |
line wrap: on
line source
# MCTF following Ohm04a import pywt import numpy import math import sys import _me import psyco psyco.full() # type of motion vectors UNCONNECTED = -(sys.maxint) CONNECTED = -(sys.maxint-1) MULTIPLE_CONNECTED = -(sys.maxint-2) # temporal low-pass frame position LEFT = -1 MIDDLE = 0 RIGHT = 1 def apply_mc(a, mvf=None, bs=8): mc_a = numpy.empty(a.shape, numpy.float) for r in xrange(0, a.shape[0], bs): for c in xrange(0, a.shape[1], bs): mv = mvf[r,c] rm, cm = r+mv[0],c+mv[1] mc_a[r:r+bs,c:c+bs] = a[rm:rm+bs,cm:cm+bs] return mc_a def apply_mc_all(w, mvfs, bs=8, tlp=MIDDLE): ws_in = [w] take = 1 while take <= len(mvfs): ws_out = [] if tlp == RIGHT: left = 0; mid = take; right = 0 elif tlp == LEFT: left = 0; mid = 0; right = take else: left = 0; mid = max(take/2, 1); right = take for i in xrange(left, mid): ws_out.append(apply_mc(ws_in[i], mvfs[i], bs)) ws_out.append(ws_in[i]) for i in xrange(mid, right): ws_out.append(ws_in[i]) ws_out.append(apply_mc(ws_in[i], mvfs[i], bs)) ws_in = ws_out del mvfs[:take] take *= 2 return ws_in def me(a, refblock, rc, cc, sr): min_sad = sys.maxint min_r, min_c = 0, 0 bs = refblock.shape[0] for rs in xrange(max(0,rc-sr),min(a.shape[0]-bs,rc+sr)+1): for cs in xrange(max(0,cc-sr),min(cc+sr,a.shape[1]-bs)+1): sad = numpy.sum(numpy.abs(refblock - a[rs:rs+bs, cs:cs+bs])) if sad < min_sad: # found new local block SAD minimum, store motion vector min_r, min_c, min_sad = rs, cs, sad return min_r, min_c, min_sad def motion_estimation(a, b, blocksize=8, searchrange=8, hlevel=2): ''' Hierarchical motion estimation from frame a to frame b. Parameters are blocksize, searchrange and search hierarchy level. Precision is full pixel only. Returns the sum-of-absolute-differences (SAD) and the motion vector field (MVF). ''' mvf = numpy.zeros((b.shape[0], b.shape[1], 3), numpy.int) mvf[:,:,2] = UNCONNECTED ref = numpy.asarray(b, numpy.float) # downsample frame data using Haar wavelet w = pywt.Wavelet('haar') ha = pywt.wavedec2(a, w, level=hlevel) href = pywt.wavedec2(ref, w, level=hlevel) # grows by 2 for every level hbs = blocksize//2**hlevel hsr = searchrange//2**hlevel while True: total_sad = 0.0 _2hlevel = 2**hlevel for r in xrange(0, href[0].shape[0], hbs): for c in xrange(0, href[0].shape[1], hbs): rm = r * _2hlevel cm = c * _2hlevel # set center of new search of previously found vector at higher level if mvf[rm,cm,2] >= 0: rc, cc = mvf[rm,cm,0]*2 + r, mvf[rm,cm,1]*2 + c else: rc, cc = r, c rs, cs, sad = _me.me(ha[0], href[0][r:r+hbs,c:c+hbs], rc, cc, hsr) mvf[rm:rm+blocksize,cm:cm+blocksize,:] = rs - r, cs - c, int(sad) total_sad += sad if hlevel == 0: break # upsample frame data using Haar wavelet ha = [pywt.waverec2(ha[:2], w)] + ha[2:] href = [pywt.waverec2(href[:2], w)] + href[2:] hbs *= 2 hlevel -= 1 return total_sad, mvf def ft_mvf(a, b, mvf, imvf, bs=8): ''' Motion-compensated temporal decomposition between frame a and b using Haar wavelet applying a given forward and inverse motion field. ''' H = numpy.empty(a.shape, numpy.float) L = numpy.empty(a.shape, numpy.float) i0 = numpy.indices((bs,bs))[0] i1 = numpy.indices((bs,bs))[1] for r in xrange(0, a.shape[0], bs): for c in xrange(0, a.shape[1], bs): rm = mvf[r, c, 0] + r cm = mvf[r, c, 1] + c H[r:r+bs, c:c+bs] = numpy.asarray(a[r:r+bs,c:c+bs], numpy.float) - b[rm:rm+bs,cm:cm+bs] rm = r + imvf[r:r+bs,c:c+bs,0] + i0 cm = c + imvf[r:r+bs,c:c+bs,1] + i1 _a = a[rm, cm] L[r:r+bs, c:c+bs] = numpy.where( \ imvf[r:r+bs, c:c+bs, 2] == UNCONNECTED, \ numpy.asarray(b[r:r+bs, c:c+bs], numpy.float), \ 0.5 * (numpy.asarray(b[r:r+bs, c:c+bs], numpy.float) + _a)) return L, H def it_mvf(L, H, mvf, imvf, bs=8): ''' Reconstruction of two frames a and b from temporal low- and high-pass subband using Haar wavelet and applying the given forward and inverse motion field. ''' i0 = numpy.indices((bs,bs))[0] i1 = numpy.indices((bs,bs))[1] b = numpy.empty(L.shape, numpy.float) for r in xrange(0, L.shape[0], bs): for c in xrange(0, L.shape[1], bs): _L = L[r:r+bs,c:c+bs] rm = r + imvf[r:r+bs,c:c+bs,0] + i0 cm = c + imvf[r:r+bs,c:c+bs,1] + i1 _H = H[rm, cm] b[r:r+bs,c:c+bs] = numpy.where( \ imvf[r:r+bs,c:c+bs,2] == UNCONNECTED, \ _L, \ _L - 0.5 * _H) a = numpy.empty(L.shape, numpy.float) for r in xrange(0, L.shape[0], bs): for c in xrange(0, L.shape[1], bs): rm = mvf[r, c, 0] + r cm = mvf[r, c, 1] + c _H = H[r:r+bs,c:c+bs] a[r:r+bs, c:c+bs] = numpy.where( \ mvf[r:r+bs,c:c+bs,2] == MULTIPLE_CONNECTED, \ b[rm:rm+bs,cm:cm+bs] + _H, \ L[rm:rm+bs,cm:cm+bs] + 0.5 * _H) return a, b def _show_mv_dist(mvf, idx=0, level=0, sr=8, fname='mv_dist'): import Image, ImageDraw im = Image.new('RGB', (mvf.shape[1], mvf.shape[0])) d = ImageDraw.Draw(im) for r in xrange(mvf.shape[0]): for c in xrange(mvf.shape[1]): mv = mvf[r][c] if sr > 0: w = int(math.sqrt(mv[0]**2 + mv[1]**2)*255/(sr*math.sqrt(2.0))) else: w = 0 if mv[2] >= 0 or mv[2] == CONNECTED: color = (0, w, 0) elif mv[2] == UNCONNECTED: color = (255, 0, 0) elif mv[2] == MULTIPLE_CONNECTED: color = (0, 0, w) d.point((c, r), fill=color) del d im.save('%s-%02d-%04d.png' % (fname, level, idx), 'PNG') del im def show_mvf(mvf, imvf, idx=0, level=0, bs=8, sr=8): ''' Visualize the motion field as .png and output motion vectors to .txt. ''' im = Image.new('RGB', (mvf.shape[1]*2, mvf.shape[0]*2)) d = ImageDraw.Draw(im) f = open('mv-%02d-%04d.txt' % (level, idx), 'wt') sad = mvf[:,:,2].ravel() sad_min = numpy.min(numpy.where(sad < 0.0, 0, sad)) sad_max = numpy.max(sad) for r in xrange(0,mvf.shape[0],bs): for c in xrange(0,mvf.shape[1],bs): mv = mvf[r][c] print >>f, '(%d %d)' % (mv[1], mv[0]), # fill block according to SAD if sad_max > 0 and mv[2] > 0: d.rectangle([(c*2,r*2),(c*2+bs*2,r*2+bs*2)], fill=((mv[2]-sad_min)*255/sad_max,0,0)) # draw motion vector if sr > 0: w = int(math.sqrt(mv[0]**2 + mv[1]**2)/(sr*math.sqrt(2.0))) else: w = 0 d.line([ \ (c*2+bs, r*2+bs), \ (c*2+bs+mv[1]*2, r*2+bs+mv[0]*2)], \ fill=(0,int(32+(255-32)*w),0)) d.point((c*2+bs, r*2+bs), fill=(255,255,255)) print >>f print >>f f.close() del d im.save('mv-%02d-%04d.png' % (level, idx), 'PNG') del im _show_mv_dist(mvf, idx, level, sr, 'mvf_dist') _show_mv_dist(imvf, idx, level, sr, 'mvi_dist') def inverse_mvf(mvf, bs=8): ''' Compute the inverse of the motion field. ''' imvf = numpy.zeros((mvf.shape[0], mvf.shape[1], 3), numpy.int) imvf[:,:,2] = UNCONNECTED for r in xrange(0, mvf.shape[0], bs): for c in xrange(0, mvf.shape[1], bs): rm = mvf[r,c,0] + r cm = mvf[r,c,1] + c blockmvf = mvf[r:r+bs,c:c+bs] blockimvf = imvf[rm:rm+bs,cm:cm+bs] # mark multiple connected in forward motion field if pixel already connected numpy.place(blockmvf[:,:,2], blockimvf[:,:,2] > UNCONNECTED, MULTIPLE_CONNECTED) # invert motion vector and store in inverse motion field, mark pixel as connected unconnected = blockimvf[:,:,2] == UNCONNECTED numpy.place(blockimvf[:,:,0], unconnected, -mvf[r,c,0]) numpy.place(blockimvf[:,:,1], unconnected, -mvf[r,c,1]) numpy.place(blockimvf[:,:,2], unconnected, CONNECTED) return mvf, imvf def decompose(seq, bs=8, sr=8, hlevel=2, tlp=MIDDLE): if len(seq) == 1: return seq, [], [] if tlp == RIGHT: left = 0; mid = len(seq); right = 0 elif tlp == LEFT: left = 0; mid = 0; right = len(seq) else: left = 0; mid = max(len(seq)/2, 2); right = len(seq) MVFs = []; Ls = []; Hs = [] for i in xrange(left, mid, 2): sad, mvf = motion_estimation(seq[i+1], seq[i], bs, sr, hlevel) mvf, imvf = inverse_mvf(mvf, bs) MVFs += [mvf] L, H = ft_mvf(seq[i], seq[i+1], mvf, imvf, bs) Ls += [L] Hs += [H] for i in xrange(mid, right, 2): sad, mvf = motion_estimation(seq[i], seq[i+1], bs, sr, hlevel) mvf, imvf = inverse_mvf(mvf, bs) MVFs += [mvf] L, H = ft_mvf(seq[i+1], seq[i], mvf, imvf, bs) Ls += [L] Hs += [H] return Ls, Hs, MVFs def decompose_using_mvf(seq, MVFs, bs=8, tlp=MIDDLE): if len(seq) == 1: return seq, [] if tlp == RIGHT: left = 0; mid = len(seq); right = 0 elif tlp == LEFT: left = 0; mid = 0; right = len(seq) else: left = 0; mid = max(len(seq)/2, 2); right = len(seq) Ls = []; Hs = [] for i in xrange(left, mid, 2): mvf = MVFs[i//2] mvf, imvf = inverse_mvf(mvf, bs) L, H = ft_mvf(seq[i], seq[i+1], mvf, imvf, bs) Ls += [L] Hs += [H] for i in xrange(mid, right, 2): mvf = MVFs[i//2] mvf, imvf = inverse_mvf(mvf, bs) L, H = ft_mvf(seq[i+1], seq[i], mvf, imvf, bs) Ls += [L] Hs += [H] return Ls, Hs def decompose_sequence(seq, bs=8, sr=8, hlevel=2, tlp=MIDDLE): ''' Recursively decompose frame sequence using motion-compensated temporal filtering employing the parameters blocksize, searchrange and hierarchy level for motion estimation. Output is [L], [H0, H1, H1, H2, H2, H2, H2], [MVF0, MVF1, MVF1, MVF2, MVF2, MVF2, MVF2] for a sequence of length 8. The tlp argument allows to move the temporal low-pass frame to the left, middle or right. ''' Ls, Hs, MVFs = decompose(seq, bs, sr, hlevel, tlp) while len(Ls) > 1: Ls, hs, mvfs = decompose(Ls, bs, sr, hlevel, tlp) Hs = hs + Hs; MVFs = mvfs + MVFs return Ls, Hs, MVFs def decompose_sequence_using_mvf(seq, MVFs, bs=8, tlp=MIDDLE): ''' Recursively decompose frame sequence using motion-compensated temporal filtering employing the given motion vector field. Output is [L], [H0, H1, H1, H2, H2, H2, H2] for a sequence of length 8. The tlp argument allows to move the temporal low-pass frame to the left, middle or right. ''' Ls, Hs = decompose_using_mvf(seq, MVFs[-len(seq)/2:], bs, tlp) del MVFs[-len(seq)/2:] while len(Ls) > 1: Ls, hs = decompose_using_mvf(Ls, MVFs[-len(Ls)/2:], bs, tlp) del MVFs[-len(Ls):] Hs = hs + Hs return Ls, Hs def reconstruct(seq, Hs, MVFs, bs=8, tlp=MIDDLE): if len(Hs) == 0: return seq if tlp == RIGHT: left = 0; mid = len(seq); right = 0 elif tlp == LEFT: left = 0; mid = 0; right = len(seq) else: left = 0; mid = max(len(seq)/2, 1); right = len(seq) Ls = [] for i in xrange(0, mid): mvf = MVFs[0] mvf, imvf = inverse_mvf(mvf, bs) a, b = it_mvf(seq[i], Hs[0], mvf, imvf, bs) Ls += [a] + [b] del Hs[0] del MVFs[0] for i in xrange(mid, right): mvf = MVFs[0] mvf, imvf = inverse_mvf(mvf, bs) a, b = it_mvf(seq[i], Hs[0], mvf, imvf, bs) Ls += [b] + [a] del Hs[0] del MVFs[0] return Ls def reconstruct_sequence(seq, Hs, MVFs, bs=8, tlp=MIDDLE): ''' Recursively reconstruct a frame sequence from temporal low- and high-pass subbands and motion fields. ''' while len(seq) <= len(Hs): seq = reconstruct(seq, Hs, MVFs, bs, tlp) return seq