Generic Iterative Dataflow Analysis in Python
Here is an example of a remarkably simple yet powerful dataflow analysis technique. The algorithm is very generic and can be used to implement a number of forward and backward analyses such as constant propagation, reaching definitions, value-set analysis, or in my case type inference.
The algorithm, adapted from the maximal fixedpoint algorithm in the dragon book, takes a control flow graph as input and outputs IN and OUT (maps from basic block to abstract state at their entry and exit (an abstract state maps variables to abstract values)). ItĀ is parametric, you must supply it with a few functions that will determine the output of your analysis:
- analysis.meet, takes two abstract states and returns an abstract state (see lattices). To guarantee that the algorithm terminates, this function should be monotone (and your lattice of abstract values of finite height).
- analysis.step_forward (resp. analysis.step_backward), a function that takes an instruction and an abstract state at its entry (resp. exit) and “executes” it, transforming the abstract state. They are used to automatically compute the transfer function for each basic block in the cfg.
def forward_transfer_function(analysis, bb, IN_bb):
OUT_bb = IN_bb.copy()
for insn in bb:
analysis.step_forward(insn, OUT_bb)
return OUT_bb
def backward_transfer_function(analysis, bb, OUT_bb):
IN_bb = OUT_bb.copy()
for insn in reversed(bb):
analysis.step_backward(insn, IN_bb)
return IN_bb
def update(env, bb, newval, todo_set, todo_candidates):
if newval != env[bb]:
print '{0} has changed, adding {1}'.format(bb, todo_candidates)
env[bb] = newval
todo_set |= todo_candidates
def maximal_fixed_point(analysis, cfg, init={}):
# state at the entry and exit of each basic block
IN, OUT = {}, {}
for bb in cfg.nodes:
IN[bb] = {}
OUT[bb] = {}
IN[cfg.entry_point] = init
# first make a pass over each basic block
todo_forward = cfg.nodes
todo_backward = cfg.nodes
while todo_backward or todo_forward:
while todo_forward:
bb = todo_forward.pop()
####
# compute the environment at the entry of this BB
new_IN = reduce(analysis.meet, map(OUT.get, cfg.pred[bb]), IN[bb])
update(IN, bb, new_IN, todo_backward, cfg.pred[bb])
####
# propagate information for this basic block
new_OUT = forward_transfer_function(analysis, bb, IN[bb])
update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb])
while todo_backward:
bb = todo_backward.pop()
####
# compute the environment at the exit of this BB
new_OUT = reduce(analysis.meet, map(IN.get, succ[bb]), OUT[bb])
update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb])
####
# propagate information for this basic block (backwards)
new_IN = backward_transfer_function(analysis, bb, OUT[bb])
update(IN, bb, new_IN, todo_backward, cfg.pred[bb])
####
# IN and OUT have converged
return IN, OUT
- starts with an empty state at some arbitrary basic block
- makes a forward pass and a backward pass over each basic block, adding the successors/predecessors to a worklist when changes are detected
- continues until the worklist is empty.
- OUT(B0) = [a->{1}]
- OUT(B1) = [a-> {-1}]
- then meet could output IN(B2) = [a -> {1, -1}]
def meet_val(lhs, rhs):
result = None
if lhs == 'NAC' or rhs == 'NAC':
result = 'NAC'
elif lhs == 'UNDEF' or rhs == 'UNDEF':
result = 'UNDEF'
else:
result = 'CONST'
return result
def meet_env(lhs, rhs):
lhs_keys = set(lhs.keys())
rhs_keys = set(rhs.keys())
result = {}
for var in lhs_keys - rhs_keys:
result[var] = lhs[var]
for var in rhs_keys - lhs_keys:
result[var] = rhs[var]
for var in lhs_keys & rhs_keys:
result[var] = meet_val(lhs[var], rhs[var])
return result
def abstract_value(env, expr):
if expr.isdigit():
return 'CONST'
try:
return env[expr]
except KeyError:
return 'UNDEF'
def step_forward(insn, env_in):
if type(insn) == str:
return
var, op, expr = insn
# insn is var = c
if len(expr) == 1:
env_in[var] = abstract_value(env_in, expr)
else:
e1, op, e2 = expr
val1 = abstract_value(env_in, e1)
val2 = abstract_value(env_in, e2)
env_in[var] = meet_val(val1, val2)
def step_backward(insn, env_in):
pass
The function step_forward defines the abstract semantics for the statements or instructions of the language you want to analyze and for the analysis you want to implement. For instance here we only collect if a variable at some program point is constant, undefined, or not a constant (NAC). To do the actual propagation, we could also collect the allocation site of the constant.
Let’s consider a super simple language, where variables are numbers that can only be affected to or added together. The function meet_val computes the meet for two abstract values, according to this table:
UNDEF CONST NAC
-----------------
UNDEF | UNDEF UNDEF NAC
CONST | UNDEF CONST NAC
NAC | NAC NAC NAC
Let’s consider a simple program in this “language” where we don’t specify the constructs for the control flow. The algorithm just assumes that every edge in the CFG is reachable. This is obviously not the case in practice, but that only means that we are going to miss some patterns (the analysis is sound but imprecise in order to terminate).
import networkx as nx
class SomeObject:
pass
def instructionify(somestr):
toks = somestr.split()
if '+' in somestr:
return (toks[0], toks[1], (toks[2], toks[3], toks[4]))
return tuple(somestr.split())
# setup the program's cfg
prog = nx.DiGraph()
s0 = ('entry'),
s1 = instructionify('b = x'),
s2 = instructionify('c = 2'),
s3 = instructionify('a = 40 + c'),
s4 = instructionify('ret = a + x'),
prog.add_edge(s0, s1)
prog.add_edge(s1, s2)
prog.add_edge(s2, s1)
prog.add_edge(s1, s3)
prog.add_edge(s3, s3)
prog.add_edge(s3, s4)
# initialize pred and succ
pred, succ = {}, {}
for bb in prog:
pred[bb] = set(prog.predecessors(bb))
succ[bb] = set(prog.successors(bb))
cfg = SomeObject()
cfg.nodes = set(prog.nodes())
cfg.pred = pred
cfg.succ = succ
cfg.entry_point = s0
analysis = SomeObject()
analysis.meet = meet_env
analysis.step_forward = step_forward
analysis.step_backward = step_backward
# run the whole thing
IN, OUT = maximal_fixed_point(analysis, cfg)
print 'a at program point s3 is', OUT[s3]['a']
print 'ret at program point s4 is', OUT[s4]['ret']
And the output is:
a at program point s3 is CONST ret at program point s4 is UNDEF
As a final note: it is possible to speed things up a bit by choosing a better ordering for basic blocks than just going randomly at first (because we initially fail to propagate lots of information). This might end up in another blog post. Cheers!

The OCaml version is particularly terse: http://pastebin.com/fsbTFL60 (DSE, not CP)
Rolf Rolles
April 25, 2011 at 12:06
Indeed. The only thing I really lack in Python compared to OCaml is the match .. with operator. Apart from that, I’m happy with the functional operations (map/reduce/filter) coupled with set operations.
dan
April 25, 2011 at 13:26