Here is an example of a remarkably simple yet powerful dataflow analysis technique. The algorithm is very generic and can be used to implement a number of forward and backward analyses such as constant propagation, reaching definitions, value-set analysis, or in my case type inference.
The algorithm, adapted from the maximal fixedpoint algorithm in the dragon book, takes a control flow graph as input and outputs IN and OUT (maps from basic block to abstract state at their entry and exit (an abstract state maps variables to abstract values)). It is parametric, you must supply it with a few functions that will determine the output of your analysis:
- analysis.meet, takes two abstract states and returns an abstract state (see lattices). To guarantee that the algorithm terminates, this function should be monotone (and your lattice of abstract values of finite height).
- analysis.step_forward (resp. analysis.step_backward), a function that takes an instruction and an abstract state at its entry (resp. exit) and “executes” it, transforming the abstract state. They are used to automatically compute the transfer function for each basic block in the cfg.
def forward_transfer_function(analysis, bb, IN_bb): OUT_bb = IN_bb.copy() for insn in bb: analysis.step_forward(insn, OUT_bb) return OUT_bb def backward_transfer_function(analysis, bb, OUT_bb): IN_bb = OUT_bb.copy() for insn in reversed(bb): analysis.step_backward(insn, IN_bb) return IN_bb def update(env, bb, newval, todo_set, todo_candidates): if newval != env[bb]: print '{0} has changed, adding {1}'.format(bb, todo_candidates) env[bb] = newval todo_set |= todo_candidates def maximal_fixed_point(analysis, cfg, init={}): # state at the entry and exit of each basic block IN, OUT = {}, {} for bb in cfg.nodes: IN[bb] = {} OUT[bb] = {} IN[cfg.entry_point] = init # first make a pass over each basic block todo_forward = cfg.nodes todo_backward = cfg.nodes while todo_backward or todo_forward: while todo_forward: bb = todo_forward.pop() #### # compute the environment at the entry of this BB new_IN = reduce(analysis.meet, map(OUT.get, cfg.pred[bb]), IN[bb]) update(IN, bb, new_IN, todo_backward, cfg.pred[bb]) #### # propagate information for this basic block new_OUT = forward_transfer_function(analysis, bb, IN[bb]) update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb]) while todo_backward: bb = todo_backward.pop() #### # compute the environment at the exit of this BB new_OUT = reduce(analysis.meet, map(IN.get, succ[bb]), OUT[bb]) update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb]) #### # propagate information for this basic block (backwards) new_IN = backward_transfer_function(analysis, bb, OUT[bb]) update(IN, bb, new_IN, todo_backward, cfg.pred[bb]) #### # IN and OUT have converged return IN, OUT
- starts with an empty state at some arbitrary basic block
- makes a forward pass and a backward pass over each basic block, adding the successors/predecessors to a worklist when changes are detected
- continues until the worklist is empty.
- OUT(B0) = [a->{1}]
- OUT(B1) = [a-> {-1}]
- then meet could output IN(B2) = [a -> {1, -1}]
def meet_val(lhs, rhs): result = None if lhs == 'NAC' or rhs == 'NAC': result = 'NAC' elif lhs == 'UNDEF' or rhs == 'UNDEF': result = 'UNDEF' else: result = 'CONST' return result def meet_env(lhs, rhs): lhs_keys = set(lhs.keys()) rhs_keys = set(rhs.keys()) result = {} for var in lhs_keys - rhs_keys: result[var] = lhs[var] for var in rhs_keys - lhs_keys: result[var] = rhs[var] for var in lhs_keys & rhs_keys: result[var] = meet_val(lhs[var], rhs[var]) return result def abstract_value(env, expr): if expr.isdigit(): return 'CONST' try: return env[expr] except KeyError: return 'UNDEF' def step_forward(insn, env_in): if type(insn) == str: return var, op, expr = insn # insn is var = c if len(expr) == 1: env_in[var] = abstract_value(env_in, expr) else: e1, op, e2 = expr val1 = abstract_value(env_in, e1) val2 = abstract_value(env_in, e2) env_in[var] = meet_val(val1, val2) def step_backward(insn, env_in): pass
The function step_forward defines the abstract semantics for the statements or instructions of the language you want to analyze and for the analysis you want to implement. For instance here we only collect if a variable at some program point is constant, undefined, or not a constant (NAC). To do the actual propagation, we could also collect the allocation site of the constant.

Let’s consider a super simple language, where variables are numbers that can only be affected to or added together. The function meet_val computes the meet for two abstract values, according to this table:
UNDEF CONST NAC ----------------- UNDEF | UNDEF UNDEF NAC CONST | UNDEF CONST NAC NAC | NAC NAC NAC
Let’s consider a simple program in this “language” where we don’t specify the constructs for the control flow. The algorithm just assumes that every edge in the CFG is reachable. This is obviously not the case in practice, but that only means that we are going to miss some patterns (the analysis is sound but imprecise in order to terminate).
import networkx as nx class SomeObject: pass def instructionify(somestr): toks = somestr.split() if '+' in somestr: return (toks[0], toks[1], (toks[2], toks[3], toks[4])) return tuple(somestr.split()) # setup the program's cfg prog = nx.DiGraph() s0 = ('entry'), s1 = instructionify('b = x'), s2 = instructionify('c = 2'), s3 = instructionify('a = 40 + c'), s4 = instructionify('ret = a + x'), prog.add_edge(s0, s1) prog.add_edge(s1, s2) prog.add_edge(s2, s1) prog.add_edge(s1, s3) prog.add_edge(s3, s3) prog.add_edge(s3, s4) # initialize pred and succ pred, succ = {}, {} for bb in prog: pred[bb] = set(prog.predecessors(bb)) succ[bb] = set(prog.successors(bb)) cfg = SomeObject() cfg.nodes = set(prog.nodes()) cfg.pred = pred cfg.succ = succ cfg.entry_point = s0 analysis = SomeObject() analysis.meet = meet_env analysis.step_forward = step_forward analysis.step_backward = step_backward # run the whole thing IN, OUT = maximal_fixed_point(analysis, cfg) print 'a at program point s3 is', OUT[s3]['a'] print 'ret at program point s4 is', OUT[s4]['ret']
And the output is:
a at program point s3 is CONST ret at program point s4 is UNDEF
As a final note: it is possible to speed things up a bit by choosing a better ordering for basic blocks than just going randomly at first (because we initially fail to propagate lots of information). This might end up in another blog post. Cheers!
The OCaml version is particularly terse: http://pastebin.com/fsbTFL60 (DSE, not CP)
Indeed. The only thing I really lack in Python compared to OCaml is the match .. with operator. Apart from that, I’m happy with the functional operations (map/reduce/filter) coupled with set operations.
As you are now working at Apple and don’t see new posts, I’m not sure if you can get my post but I’m going to try anyway.
First of all, it is an excellent post and this is something I was looking for. Unfortunately, the contents are too advanced for me to comprehend easily.
My questions are:
(1) adapted from the maximal fixedpoint algorithm in the dragon book,
I have the Red dragon book but so far I cannot identify the section you were referring to. Can you provide where we can find the algorithm that inspired you?
(2) BB (basic block)
I knew it is a common practice to use BB (basic block) rather than individual instructions.
I’m wondering if I can create a graph by inserting every instruction as a node.
In other words, each instruction is treated as a BB (only one instruction in it).
Because you still need to evaluate IN, OUT at every instruction, I don’t see much differences between two approaches. Is there any reason that I should group instructions into a BB?
(3) Recommended reading
Could you recommend good books that cover this kind of topics (Dataflow Analysis)?
Thank you!
Hi Aki, I’m glad you like this post. It feels like I posted this ages ago so you should take this code with a grain of salt now though. To answer your questions:
Dan, thank you for your reply!
I’m creating my version of data flow analysis Python program using your program.
I will try to digest what you have provided and will experiment (tinker around).
Hi Dan,
I have created my Data Flow Analysis code using your code and experimented it with real examples.
I noticed incorrect converged results and found some minor mistakes in your code.
(1) Need to use .copy()
# first make a pass over each basic block
todo_forward = cfg.nodes => todo_forward = cfg.nodes.copy()
todo_backward = cfg.nodes => todo_backward = cfg.nodes.copy()
(2) Missing class
# compute the environment at the exit of this BB
new_OUT = reduce(analysis.meet, map(IN.get, succ[bb]), OUT[bb])
=> … map(IN.get, cfg.succ[bb]), OUT[bb])
(3) questionable propagation of OUT to IN
def backward_transfer_function(analysis, bb, OUT_bb):
IN_bb = OUT_bb.copy()
>> this just copies OUT to IN blindly as step_backward() doesn’t do any.
//
After pondering a while, I decided to change my code to just do forward propagation (no backward propagation).
This is because:
* it is not possible to make any conservative guess of IN from OUT
– in my case, only assembler level instructions are available to make a guess
* My Dragon Book seems to indicate that backward propagation is necessary only for special (challenging) data flow analysises, such as estimation of type
– the goals of my data flow analysis are much simpler ones
I have changed my code to do just forward propagation and seems converging to correct IN, OUT values.
//
I’m very grateful if you can comment about backward propagation as I couldn’t figure out how to do backward propagation using regular statements, such as b=x.
Thank you!
Aki-
Thanks for the bug fixes. I’m sure there are more or less a million other issues. I don’t quite remember how to make backward propagation work, but for the particular case I was interested in (type inference), it was needed.
This type of thing is also called “monotone frameworks [for data flow analysis]”. Hope this helps.