# Generic Iterative Dataflow Analysis in Python

Here is an example of a remarkably simple yet powerful dataflow analysis technique. The algorithm is very generic and can be used to implement a number of forward and backward analyses such as constant propagation, reaching definitions, value-set analysis, or in my case type inference.

The algorithm, adapted from the maximal fixedpoint algorithm in the dragon book, takes a control flow graph as input and outputs IN and OUT (maps from basic block to abstract state at their entry and exit (an abstract state maps variables to abstract values)). It is parametric, you must supply it with a few functions that will determine the output of your analysis:

• analysis.meet, takes two abstract states and returns an abstract state (see lattices). To guarantee that the algorithm terminates, this function should be monotone (and your lattice of abstract values of finite height).
• analysis.step_forward (resp. analysis.step_backward), a function that takes an instruction and an abstract state at its entry (resp. exit) and “executes” it, transforming the abstract state. They are used to automatically compute the transfer function for each basic block in the cfg.
It looks like this:
```def forward_transfer_function(analysis, bb, IN_bb):
OUT_bb = IN_bb.copy()
for insn in bb:
analysis.step_forward(insn, OUT_bb)
return OUT_bb

def backward_transfer_function(analysis, bb, OUT_bb):
IN_bb = OUT_bb.copy()
for insn in reversed(bb):
analysis.step_backward(insn, IN_bb)
return IN_bb

def update(env, bb, newval, todo_set, todo_candidates):
if newval != env[bb]:
print '{0} has changed, adding {1}'.format(bb, todo_candidates)
env[bb] = newval
todo_set |= todo_candidates

def maximal_fixed_point(analysis, cfg, init={}):
# state at the entry and exit of each basic block
IN, OUT = {}, {}
for bb in cfg.nodes:
IN[bb] = {}
OUT[bb] = {}
IN[cfg.entry_point] = init

# first make a pass over each basic block
todo_forward = cfg.nodes
todo_backward = cfg.nodes

while todo_backward or todo_forward:
while todo_forward:
bb = todo_forward.pop()

####
# compute the environment at the entry of this BB
new_IN = reduce(analysis.meet, map(OUT.get, cfg.pred[bb]), IN[bb])
update(IN, bb, new_IN, todo_backward, cfg.pred[bb])

####
# propagate information for this basic block
new_OUT = forward_transfer_function(analysis, bb, IN[bb])
update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb])

while todo_backward:
bb = todo_backward.pop()

####
# compute the environment at the exit of this BB
new_OUT = reduce(analysis.meet, map(IN.get, succ[bb]), OUT[bb])
update(OUT, bb, new_OUT, todo_forward, cfg.succ[bb])

####
# propagate information for this basic block (backwards)
new_IN = backward_transfer_function(analysis, bb, OUT[bb])
update(IN, bb, new_IN, todo_backward, cfg.pred[bb])

####
# IN and OUT have converged
return IN, OUT
```
Ideally, to propagate dataflow information in one pass, you would like to have visited every predecessors of a basic block B for a forward pass before analyzing B. Unfortunately, due to irreducible flow graphs you are not guaranteed to be able to do this. Instead, this algorithm
1. starts with an empty state at some arbitrary basic block
2. makes a forward pass and a backward pass over each basic block, adding the successors/predecessors to a worklist when changes are detected
3. continues until the worklist is empty.
The meet function is here to “combine” information from multiple paths, for instance if B2 is reachable from B0 and B1, then IN(B2) = meet(OUT(B1), OUT(B2)). If you wanted to collect value set information and you had:
• OUT(B0) = [a->{1}]
• OUT(B1) = [a-> {-1}]
• then meet could output IN(B2) = [a -> {1, -1}]
Depending on how meet is defined, it can look for information true for all paths coming to a basic block, or for information from at least one path.
Now, some sample code to implement a simple constant propagation analysis. It is forward only for simplicity, but the algorithm works for bidirectional analyses such as type inference.
```
def meet_val(lhs, rhs):
result = None

if lhs == 'NAC' or rhs == 'NAC':
result = 'NAC'

elif lhs == 'UNDEF' or rhs == 'UNDEF':
result = 'UNDEF'

else:
result = 'CONST'

return result

def meet_env(lhs, rhs):
lhs_keys = set(lhs.keys())
rhs_keys = set(rhs.keys())
result = {}

for var in lhs_keys - rhs_keys:
result[var] = lhs[var]

for var in rhs_keys - lhs_keys:
result[var] = rhs[var]

for var in lhs_keys & rhs_keys:
result[var] = meet_val(lhs[var], rhs[var])

return result

def abstract_value(env, expr):
if expr.isdigit():
return 'CONST'

try:
return env[expr]
except KeyError:
return 'UNDEF'

def step_forward(insn, env_in):
if type(insn) == str:
return

var, op, expr = insn

# insn is var = c
if len(expr) == 1:
env_in[var] = abstract_value(env_in, expr)

else:
e1, op, e2 = expr
val1 = abstract_value(env_in, e1)
val2 = abstract_value(env_in, e2)
env_in[var] = meet_val(val1, val2)

def step_backward(insn, env_in):
pass
```

The function step_forward defines the abstract semantics for the statements or instructions of the language you want to analyze and for the analysis you want to implement. For instance here we only collect if a variable at some program point is constant, undefined, or not a constant (NAC). To do the actual propagation, we could also collect the allocation site of the constant.

Let’s consider a super simple language, where variables are numbers that can only be affected to or added together. The function meet_val computes the meet for two abstract values, according to this table:

```        UNDEF  CONST  NAC
-----------------
UNDEF | UNDEF  UNDEF  NAC
CONST | UNDEF  CONST  NAC
NAC   | NAC    NAC    NAC```

Let’s consider a simple program in this “language” where we don’t specify the constructs for the control flow. The algorithm just assumes that every edge in the CFG is reachable. This is obviously not the case in practice, but that only means that we are going to miss some patterns (the analysis is sound but imprecise in order to terminate).

Now, we want to find if a and ret are constants. Here is the code necessary to setup and run the example (you need networkx to run it):
```
import networkx as nx

class SomeObject:
pass

def instructionify(somestr):
toks = somestr.split()
if '+' in somestr:
return (toks, toks, (toks, toks, toks))
return tuple(somestr.split())

# setup the program's cfg
prog = nx.DiGraph()
s0 = ('entry'),
s1 = instructionify('b = x'),
s2 = instructionify('c = 2'),
s3 = instructionify('a = 40 + c'),
s4 = instructionify('ret = a + x'),

# initialize pred and succ
pred, succ = {}, {}
for bb in prog:
pred[bb] = set(prog.predecessors(bb))
succ[bb] = set(prog.successors(bb))

cfg             = SomeObject()
cfg.nodes       = set(prog.nodes())
cfg.pred        = pred
cfg.succ        = succ
cfg.entry_point = s0

analysis               = SomeObject()
analysis.meet          = meet_env
analysis.step_forward  = step_forward
analysis.step_backward = step_backward

# run the whole thing
IN, OUT = maximal_fixed_point(analysis, cfg)
print 'a   at program point s3 is', OUT[s3]['a']
print 'ret at program point s4 is', OUT[s4]['ret']
```

And the output is:

```a   at program point s3 is CONST
ret at program point s4 is UNDEF```

As a final note: it is possible to speed things up a bit by choosing a better ordering for basic blocks than just going randomly at first (because we initially fail to propagate lots of information). This might end up in another blog post. Cheers!

## 7 thoughts on “Generic Iterative Dataflow Analysis in Python”

1. Rolf Rolles says:

The OCaml version is particularly terse: http://pastebin.com/fsbTFL60 (DSE, not CP)

1. dan says:

Indeed. The only thing I really lack in Python compared to OCaml is the match .. with operator. Apart from that, I’m happy with the functional operations (map/reduce/filter) coupled with set operations.

2. Aki Niimura says:

As you are now working at Apple and don’t see new posts, I’m not sure if you can get my post but I’m going to try anyway.

First of all, it is an excellent post and this is something I was looking for. Unfortunately, the contents are too advanced for me to comprehend easily.
My questions are:
(1) adapted from the maximal fixedpoint algorithm in the dragon book,
I have the Red dragon book but so far I cannot identify the section you were referring to. Can you provide where we can find the algorithm that inspired you?
(2) BB (basic block)
I knew it is a common practice to use BB (basic block) rather than individual instructions.
I’m wondering if I can create a graph by inserting every instruction as a node.
In other words, each instruction is treated as a BB (only one instruction in it).
Because you still need to evaluate IN, OUT at every instruction, I don’t see much differences between two approaches. Is there any reason that I should group instructions into a BB?
Could you recommend good books that cover this kind of topics (Dataflow Analysis)?

Thank you!

1. dan says:

Hi Aki, I’m glad you like this post. It feels like I posted this ages ago so you should take this code with a grain of salt now though. To answer your questions:

1. I think I may have been using the 2nd edition (Purple dragon book). That would be in chapter 9 (Machine-Independent Optimizations), section 9.2 (Introduction to Data-Flow Analysis) and 9.4 (Constant Propagation)
2. You can certainly consider every instruction to be its own BB, but over the long run it will save you time and effort to just glob linear sequences of instructions into the same BB. Your graph will have less nodes and less edges. It just generally makes sense.
3. You should get your hands on the latest edition of the dragon book, and “the Muchnick” is also a good reference (Advanced Compiler Design and Implementation). I also strongly recommend looking at everything related to LLVM, it is very nicely engineered.
1. Aki Niimura says:

I’m creating my version of data flow analysis Python program using your program.
I will try to digest what you have provided and will experiment (tinker around).

2. Aki Niimura says:

Hi Dan,

I have created my Data Flow Analysis code using your code and experimented it with real examples.
I noticed incorrect converged results and found some minor mistakes in your code.
(1) Need to use .copy()
# first make a pass over each basic block
todo_forward = cfg.nodes => todo_forward = cfg.nodes.copy()
todo_backward = cfg.nodes => todo_backward = cfg.nodes.copy()
(2) Missing class
# compute the environment at the exit of this BB
new_OUT = reduce(analysis.meet, map(IN.get, succ[bb]), OUT[bb])
=> … map(IN.get, cfg.succ[bb]), OUT[bb])
(3) questionable propagation of OUT to IN
def backward_transfer_function(analysis, bb, OUT_bb):
IN_bb = OUT_bb.copy()
>> this just copies OUT to IN blindly as step_backward() doesn’t do any.

//

After pondering a while, I decided to change my code to just do forward propagation (no backward propagation).
This is because:
* it is not possible to make any conservative guess of IN from OUT
– in my case, only assembler level instructions are available to make a guess
* My Dragon Book seems to indicate that backward propagation is necessary only for special (challenging) data flow analysises, such as estimation of type
– the goals of my data flow analysis are much simpler ones

I have changed my code to do just forward propagation and seems converging to correct IN, OUT values.

//

I’m very grateful if you can comment about backward propagation as I couldn’t figure out how to do backward propagation using regular statements, such as b=x.

Thank you!
Aki-

3. dan says:

Thanks for the bug fixes. I’m sure there are more or less a million other issues. I don’t quite remember how to make backward propagation work, but for the particular case I was interested in (type inference), it was needed.

This type of thing is also called “monotone frameworks [for data flow analysis]”. Hope this helps.