Mercurial > hg > pymctf
view pymctf.py @ 0:4214d9245f8e
import
author | Peter Meerwald <pmeerw@cosy.sbg.ac.at> |
---|---|
date | Thu, 06 Sep 2007 13:45:48 +0200 |
parents | |
children | b67c5ec1a9f0 |
line wrap: on
line source
# MCTF following Ohm04a import Image import ImageDraw import pywt import numpy import math import sys import time import _me # type of motion vectors UNCONNECTED = -(sys.maxint) CONNECTED = -(sys.maxint-1) MULTIPLE_CONNECTED = -(sys.maxint-2) # temporal low-pass frame position LEFT = -1 MIDDLE = 0 RIGHT = 1 def me(a, refblock, rc, cc, sr): min_sad = sys.maxint min_r, min_c = 0, 0 bs = refblock.shape[0] for rs in xrange(max(0,rc-sr),min(a.shape[0]-bs,rc+sr)+1): for cs in xrange(max(0,cc-sr),min(cc+sr,a.shape[1]-bs)+1): sad = numpy.sum(numpy.abs(refblock - a[rs:rs+bs, cs:cs+bs])) if sad < min_sad: # found new local block SAD minimum, store motion vector min_r, min_c, min_sad = rs, cs, sad return min_r, min_c, min_sad def motion_estimation(a, b, blocksize=8, searchrange=8, hlevel=2): ''' Hierarchical motion estimation from frame a to frame b. Parameters are blocksize, searchrange and search hierarchy level. Precision is full pixel only. Returns the sum-of-absolute-differences (SAD) and the motion vector field (MVF). ''' mvf = numpy.zeros((b.shape[0], b.shape[1], 3), numpy.int) mvf[:,:,2] = UNCONNECTED ref = numpy.asarray(b, numpy.float) # downsample frame data using Haar wavelet w = pywt.Wavelet('haar') ha = pywt.wavedec2(a, w, level=hlevel) href = pywt.wavedec2(ref, w, level=hlevel) # grows by 2 for every level hbs = blocksize//2**hlevel hsr = searchrange//2**hlevel while True: total_sad = 0.0 _2hlevel = 2**hlevel for r in xrange(0, href[0].shape[0], hbs): for c in xrange(0, href[0].shape[1], hbs): rm = r * _2hlevel cm = c * _2hlevel # set center of new search of previously found vector at higher level if mvf[rm,cm,2] >= 0: rc, cc = mvf[rm,cm,0]*2 + r, mvf[rm,cm,1]*2 + c else: rc, cc = r, c rs, cs, sad = _me.me(ha[0], href[0][r:r+hbs,c:c+hbs], rc, cc, hsr) mvf[rm:rm+blocksize,cm:cm+blocksize,:] = rs - r, cs - c, int(sad) total_sad += sad if hlevel == 0: break # upsample frame data using Haar wavelet ha = [pywt.waverec2(ha[:2], w)] + ha[2:] href = [pywt.waverec2(href[:2], w)] + href[2:] hbs *= 2 hlevel -= 1 return total_sad, mvf def ft_mvf(a, b, mvf, imvf, bs=8): ''' Motion-compensated temporal decomposition between frame a and b using Haar wavelet applying a given forward and inverse motion field. ''' H = numpy.empty(a.shape, numpy.float) L = numpy.empty(a.shape, numpy.float) i0 = numpy.indices((bs,bs))[0] i1 = numpy.indices((bs,bs))[1] for r in xrange(0, a.shape[0], bs): for c in xrange(0, a.shape[1], bs): rm = mvf[r, c, 0] + r cm = mvf[r, c, 1] + c H[r:r+bs, c:c+bs] = numpy.asarray(a[r:r+bs,c:c+bs], numpy.float) - b[rm:rm+bs,cm:cm+bs] rm = r + imvf[r:r+bs,c:c+bs,0] + i0 cm = c + imvf[r:r+bs,c:c+bs,1] + i1 _a = a[rm, cm] L[r:r+bs, c:c+bs] = numpy.where( \ imvf[r:r+bs, c:c+bs, 2] == UNCONNECTED, \ numpy.asarray(b[r:r+bs, c:c+bs], numpy.float), \ 0.5 * (numpy.asarray(b[r:r+bs, c:c+bs], numpy.float) + _a)) return L, H def it_mvf(L, H, mvf, imvf, bs=8): ''' Reconstruction of two frames a and b from temporal low- and high-pass subband using Haar wavelet and applying the given forward and inverse motion field. ''' i0 = numpy.indices((bs,bs))[0] i1 = numpy.indices((bs,bs))[1] b = numpy.empty(L.shape, numpy.float) for r in xrange(0, L.shape[0], bs): for c in xrange(0, L.shape[1], bs): _L = L[r:r+bs,c:c+bs] rm = r + imvf[r:r+bs,c:c+bs,0] + i0 cm = c + imvf[r:r+bs,c:c+bs,1] + i1 _H = H[rm, cm] b[r:r+bs,c:c+bs] = numpy.where( \ imvf[r:r+bs,c:c+bs,2] == UNCONNECTED, \ _L, \ _L - 0.5 * _H) a = numpy.empty(L.shape, numpy.float) for r in xrange(0, L.shape[0], bs): for c in xrange(0, L.shape[1], bs): rm = mvf[r, c, 0] + r cm = mvf[r, c, 1] + c _H = H[r:r+bs,c:c+bs] a[r:r+bs, c:c+bs] = numpy.where( \ mvf[r:r+bs,c:c+bs,2] == MULTIPLE_CONNECTED, \ b[rm:rm+bs,cm:cm+bs] + _H, \ L[rm:rm+bs,cm:cm+bs] + 0.5 * _H) return a, b def _show_mv_dist(mvf, idx=0, level=0, sr=8, fname='mv_dist'): im = Image.new('RGB', (mvf.shape[1], mvf.shape[0])) d = ImageDraw.Draw(im) for r in xrange(mvf.shape[0]): for c in xrange(mvf.shape[1]): mv = mvf[r][c] if sr > 0: w = int(math.sqrt(mv[0]**2 + mv[1]**2)*255/(sr*math.sqrt(2.0))) else: w = 0 if mv[2] >= 0 or mv[2] == CONNECTED: color = (0, w, 0) elif mv[2] == UNCONNECTED: color = (255, 0, 0) elif mv[2] == MULTIPLE_CONNECTED: color = (0, 0, w) d.point((c, r), fill=color) del d im.save('%s-%02d-%04d.png' % (fname, level, idx), 'PNG') del im def show_mvf(mvf, imvf, idx=0, level=0, bs=8, sr=8): ''' Visualize the motion field as .png and output motion vectors to .txt. ''' im = Image.new('RGB', (mvf.shape[1]*2, mvf.shape[0]*2)) d = ImageDraw.Draw(im) f = open('mv-%02d-%04d.txt' % (level, idx), 'wt') sad = mvf[:,:,2].ravel() sad_min = numpy.min(numpy.where(sad < 0.0, 0, sad)) sad_max = numpy.max(sad) for r in xrange(0,mvf.shape[0],bs): for c in xrange(0,mvf.shape[1],bs): mv = mvf[r][c] print >>f, '(%d %d)' % (mv[1], mv[0]), # fill block according to SAD if sad_max > 0 and mv[2] > 0: d.rectangle([(c*2,r*2),(c*2+bs*2,r*2+bs*2)], fill=((mv[2]-sad_min)*255/sad_max,0,0)) # draw motion vector if sr > 0: w = int(math.sqrt(mv[0]**2 + mv[1]**2)/(sr*math.sqrt(2.0))) else: w = 0 d.line([ \ (c*2+bs, r*2+bs), \ (c*2+bs+mv[1]*2, r*2+bs+mv[0]*2)], \ fill=(0,int(32+(255-32)*w),0)) d.point((c*2+bs, r*2+bs), fill=(255,255,255)) print >>f print >>f f.close() del d im.save('mv-%02d-%04d.png' % (level, idx), 'PNG') del im _show_mv_dist(mvf, idx, level, sr, 'mvf_dist') _show_mv_dist(imvf, idx, level, sr, 'mvi_dist') def inverse_mvf(mvf, bs=8): ''' Compute the inverse of the motion field. ''' imvf = numpy.zeros((mvf.shape[0], mvf.shape[1], 3), numpy.int) imvf[:,:,2] = UNCONNECTED for r in xrange(0, mvf.shape[0], bs): for c in xrange(0, mvf.shape[1], bs): rm = mvf[r,c,0] + r cm = mvf[r,c,1] + c blockmvf = mvf[r:r+bs,c:c+bs] blockimvf = imvf[rm:rm+bs,cm:cm+bs] # mark multiple connected in forward motion field if pixel already connected numpy.place(blockmvf[:,:,2], blockimvf[:,:,2] > UNCONNECTED, MULTIPLE_CONNECTED) # invert motion vector and store in inverse motion field, mark pixel as connected unconnected = blockimvf[:,:,2] == UNCONNECTED numpy.place(blockimvf[:,:,0], unconnected, -mvf[r,c,0]) numpy.place(blockimvf[:,:,1], unconnected, -mvf[r,c,1]) numpy.place(blockimvf[:,:,2], unconnected, CONNECTED) return mvf, imvf def decompose_sequence(seq, Hs=[], MVFs=[], bs=8, sr=8, hlevel=2, tlp=MIDDLE, visualize_mvf=False, dlevel=-1): ''' Recursively decompose frame sequence using motion-compensated temporal filtering employing the parameters blocksize, searchrange and hierarchy level for motion estimation. Output is [L], [H0, H1, H1, H2, H2, H2, H2], [MVF0, MVF1, MVF1, MVF2, MVF2, MVF2, MVF2] for a sequence of length 8. The tlp argument allows to move the temporal low-pass frame to the left, middle or right. ''' Ls = [] if dlevel < 0: dlevel = int(math.log(len(seq), 2)) if len(seq) == 1: return seq, Hs, MVFs if tlp == RIGHT: left = 0; mid = len(seq); right = 0 elif tlp == LEFT: left = 0; mid = 0; right = len(seq) else: left = 0; mid = max(len(seq)/2, 2); right = len(seq) for i in xrange(left, mid, 2): sad, mvf = motion_estimation(seq[i+1], seq[i], bs, sr, hlevel) mvf, imvf = inverse_mvf(mvf, bs) if visualize_mvf: show_mvf(mvf, imvf, i, dlevel-1, bs, sr) MVFs.insert(i//2, mvf) L, H = ft_mvf(seq[i], seq[i+1], mvf, imvf, bs) Ls.append(L) Hs.insert(i//2, H) for i in xrange(mid, right, 2): sad, mvf = motion_estimation(seq[i], seq[i+1], bs, sr, hlevel) mvf, imvf = inverse_mvf(mvf, bs) if visualize_mvf: show_mvf(mvf, imvf, i, dlevel-1, bs, sr) MVFs.insert(i//2, mvf) L, H = ft_mvf(seq[i+1], seq[i], mvf, imvf, bs) Ls.append(L) Hs.insert(i//2, H) return decompose_sequence(Ls, Hs, MVFs, bs, sr, hlevel, tlp, visualize_mvf, dlevel-1) def reconstruct_sequence(seq, Hs, MVFs, bs=8, tlp=MIDDLE): ''' Recursively reconstruct a frame sequence from temporal low- and high-pass subbands and motion fields. ''' Ls = [] if len(Hs) == 0: return seq if tlp == RIGHT: left = 0; mid = len(seq); right = 0 elif tlp == LEFT: left = 0; mid = 0; right = len(seq) else: left = 0; mid = max(len(seq)/2, 1); right = len(seq) for i in xrange(0, mid): mvf = MVFs[0] mvf, imvf = inverse_mvf(mvf, bs) a, b = it_mvf(seq[i], Hs[0], mvf, imvf, bs) Ls += [a] + [b] del Hs[0] del MVFs[0] for i in xrange(mid, right): mvf = MVFs[0] mvf, imvf = inverse_mvf(mvf, bs) a, b = it_mvf(seq[i], Hs[0], mvf, imvf, bs) Ls += [b] + [a] del Hs[0] del MVFs[0] return reconstruct_sequence(Ls, Hs, MVFs, bs, tlp)