Here is an example of a remarkably simple yet powerful dataflow analysis technique. The algorithm is very generic and can be used to implement a number of forward and backward analyses such as constant propagation, reaching definitions, value-set analysis, or in my case type inference.
The algorithm, adapted from the maximal fixedpoint algorithm in the dragon book, takes a control flow graph as input and outputs IN and OUT (maps from basic block to abstract state at their entry and exit (an abstract state maps variables to abstract values)). It is parametric, you must supply it with a few functions that will determine the output of your analysis:
- analysis.meet, takes two abstract states and returns an abstract state (see lattices). To guarantee that the algorithm terminates, this function should be monotone (and your lattice of abstract values of finite height).
- analysis.step_forward (resp. analysis.step_backward), a function that takes an instruction and an abstract state at its entry (resp. exit) and “executes” it, transforming the abstract state. They are used to automatically compute the transfer function for each basic block in the cfg.
def forward_transfer_function(analysis, bb, IN_bb): OUT_bb = IN_bb.copy() for insn in bb: analysis.step_forward(insn, OUT_bb) return OUT_bb def backward_transfer_function(analysis, bb, OUT_bb): IN_bb = OUT_bb.copy() for insn in reversed(bb): analysis.step_backward(insn, IN_bb) return IN_bb def update(env, bb, newval, todo_set, todo_candidates): if newval != env[bb]: print '{0} has changed, adding {1}'.format(bb, todo_candidates) env[bb] = newval todo_set |= todo_candidates def maximal_fixed_point(analysis, cfg, init={}): # state at the entry and exit of each basic block IN, OUT = {}, {} for bb in cfg.nodes: IN[bb] = {} OUT[bb] = {} IN[cfg.entry_point] = init # first make a pass over each basic block todo_forward = cfg.nodes todo_backward = cfg.nodes while todo_backward or todo_forward: while todo_forward: bb = todo_forward.pop() #### # compute the environment at the entry of this BB new_IN = reduce(analysis.meet, map(OUT.get, cfg.pred[bb]), IN[bb]) update(IN, bb, new_IN, todo_backward, cfg.pred[bb]) #### # propagate information for this basic block new_OUT = forward_transfer_function(analysis, bb, IN[bb]) update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb]) while todo_backward: bb = todo_backward.pop() #### # compute the environment at the exit of this BB new_OUT = reduce(analysis.meet, map(IN.get, succ[bb]), OUT[bb]) update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb]) #### # propagate information for this basic block (backwards) new_IN = backward_transfer_function(analysis, bb, OUT[bb]) update(IN, bb, new_IN, todo_backward, cfg.pred[bb]) #### # IN and OUT have converged return IN, OUT
- starts with an empty state at some arbitrary basic block
- makes a forward pass and a backward pass over each basic block, adding the successors/predecessors to a worklist when changes are detected
- continues until the worklist is empty.
- OUT(B0) = [a->{1}]
- OUT(B1) = [a-> {-1}]
- then meet could output IN(B2) = [a -> {1, -1}]
def meet_val(lhs, rhs): result = None if lhs == 'NAC' or rhs == 'NAC': result = 'NAC' elif lhs == 'UNDEF' or rhs == 'UNDEF': result = 'UNDEF' else: result = 'CONST' return result def meet_env(lhs, rhs): lhs_keys = set(lhs.keys()) rhs_keys = set(rhs.keys()) result = {} for var in lhs_keys - rhs_keys: result[var] = lhs[var] for var in rhs_keys - lhs_keys: result[var] = rhs[var] for var in lhs_keys & rhs_keys: result[var] = meet_val(lhs[var], rhs[var]) return result def abstract_value(env, expr): if expr.isdigit(): return 'CONST' try: return env[expr] except KeyError: return 'UNDEF' def step_forward(insn, env_in): if type(insn) == str: return var, op, expr = insn # insn is var = c if len(expr) == 1: env_in[var] = abstract_value(env_in, expr) else: e1, op, e2 = expr val1 = abstract_value(env_in, e1) val2 = abstract_value(env_in, e2) env_in[var] = meet_val(val1, val2) def step_backward(insn, env_in): pass
The function step_forward defines the abstract semantics for the statements or instructions of the language you want to analyze and for the analysis you want to implement. For instance here we only collect if a variable at some program point is constant, undefined, or not a constant (NAC). To do the actual propagation, we could also collect the allocation site of the constant.

Let’s consider a super simple language, where variables are numbers that can only be affected to or added together. The function meet_val computes the meet for two abstract values, according to this table:
UNDEF CONST NAC ----------------- UNDEF | UNDEF UNDEF NAC CONST | UNDEF CONST NAC NAC | NAC NAC NAC
Let’s consider a simple program in this “language” where we don’t specify the constructs for the control flow. The algorithm just assumes that every edge in the CFG is reachable. This is obviously not the case in practice, but that only means that we are going to miss some patterns (the analysis is sound but imprecise in order to terminate).
import networkx as nx class SomeObject: pass def instructionify(somestr): toks = somestr.split() if '+' in somestr: return (toks[0], toks[1], (toks[2], toks[3], toks[4])) return tuple(somestr.split()) # setup the program's cfg prog = nx.DiGraph() s0 = ('entry'), s1 = instructionify('b = x'), s2 = instructionify('c = 2'), s3 = instructionify('a = 40 + c'), s4 = instructionify('ret = a + x'), prog.add_edge(s0, s1) prog.add_edge(s1, s2) prog.add_edge(s2, s1) prog.add_edge(s1, s3) prog.add_edge(s3, s3) prog.add_edge(s3, s4) # initialize pred and succ pred, succ = {}, {} for bb in prog: pred[bb] = set(prog.predecessors(bb)) succ[bb] = set(prog.successors(bb)) cfg = SomeObject() cfg.nodes = set(prog.nodes()) cfg.pred = pred cfg.succ = succ cfg.entry_point = s0 analysis = SomeObject() analysis.meet = meet_env analysis.step_forward = step_forward analysis.step_backward = step_backward # run the whole thing IN, OUT = maximal_fixed_point(analysis, cfg) print 'a at program point s3 is', OUT[s3]['a'] print 'ret at program point s4 is', OUT[s4]['ret']
And the output is:
a at program point s3 is CONST ret at program point s4 is UNDEF
As a final note: it is possible to speed things up a bit by choosing a better ordering for basic blocks than just going randomly at first (because we initially fail to propagate lots of information). This might end up in another blog post. Cheers!