Nodes¶
The nodes that form the DAG are declared as normal python functions, decorated
with one of the node decorators such as evalnode()
.
Nodes are callable objects that take no arguments. Calling them either invokes the node function or returns the previous cached result if no dependencies of the node have changed.
Nodes may only be called from other node functions. Calling a node outside of a node function will result in an error. Evaluating a node outside of another node function must be done via a context object.
Dependencies between nodes are discovered at run-time as the nodes are evaluated. The context keeps track of what node is currently being evaluated and as that node references other nodes it adds the edges to the DAG. If a node is conditionally evaluated from another node function, that dependency is only discovered once that condition is met and the branch evaluating the other node is executed.
Nodes are evaluated from other within other node functions (or any function called by a node function) by calling them:
from mdf import evalnode
@evalnode
def node_function():
"""
this function is actually a node because of the
use of the @evalnode decorator
"""
# to evaluate other nodes they just need to be called
value = another_node()
# do some calculation
result = ...
return result
@evalnode
def another_node():
# do some calculation possibly involving other nodes
return result
Time Dependent Nodes¶
Nodes are marked as requiring re-calculation whenever any of their dependencies are modified. They are later lazily evaluated as required.
There’s a builtin node now()
that behaves in a more specialized way and
allows node valuations to evolve over time.
When the now()
node is advanced, which can be done manually via
MDFContext.set_date()
, all the nodes dependent on time are marked as
requiring re-calculation but additionally they are marked that the reason they
require re-calculation is because time has moved forwards.
evalnode()
nodes can be generators instead of regular functions
(generators yield values rather than return a single value). When a
generator is used mdf
will advance the generator of the node each
time the now()
node is advanced. This allows state to be maintained
between valuations:
from mdf import MDFContext, evalnode, now
from datetime import datetime, timedelta
@evalnode
def time_dependent_node():
"""
a simple node whose value is dependent on 'now'
"""
# returns 0, 1, 2, ... for the current weekday
return now().weekday()
@evalnode
def incrementally_updated_node():
"""
the value of this node is the sum of another node
"""
# the first value will simply be time_dependent_node
todays_value = time_dependent_node()
yield todays_value
# when the date is advanced this generator is continued
# until the next yield
while True:
# yield today's value + the value evaluated previously
prev_value = todays_value
todays_value = time_dependent_node()
yield todays_value + prev_value
# create the context with an initial date
date = datetime(2011, 9, 2)
ctx = MDFContext(date)
# get the value of incrementally_updated_node
x = ctx[incrementally_updated_node]
# x is now 4 (Friday)
# advance the date one day
date += timedelta(days=1)
# set the date on the context to be invoked (this causes the
# incrementally_updated_node generator to be advanced)
ctx.set_date(date)
# get the value of incrementally_updated_node
x = ctx[incrementally_updated_node]
# x is now 9 : 4 (Friday) + 5 (Saturday) = 9
This is a simple example, but the same methods can be used to build more complex nodes that perform incrementally calculated time-dependent nodes.
If time is ever moved backwards by calling MDFContext.set_date()
then
the current state of the time dependent nodes is discarded and the initial
state will be re-evaluated by restarting the generators.
Filtering¶
For nodes that update incrementally with time sometimes it’s useful to be able to specify whether the update should be called or not for a particular date rather than have to check inside the update function.
For example, some values might only need updating on valid business days but the context might be stepped through all calendar dates for a date range:
from mdf import evalnode
def my_node_filter():
# the filtered evalnode will only be advanced on business days
if my_is_valid_business_day_function():
return True
return False
@evalnode(filter=my_node_filter)
def my_node():
yield some_initial_value
while True:
do_some_update_calculation(...)
yield updated_value
The filter could be a node instead of a function. This is convenient if you need to apply the same filter to multiple nodes as it won’t be re-calculated more than necessary.
To make it easy to get a filter relating to a specific series of data there’s
a function filternode()
to create a node that returns True
when
the current date is in the index of that data, or False
otherwise. This
makes it simpler to perform calculations at the frequency of the underlying
data.
Queue Nodes¶
Queue nodes are a specialized time-dependent node. The value of the node is a
double ended queue (see collections.deque
) of values. A double ended queue
is used as it supports efficient appending and popping to both sides of the
queue. Queues can also be used to construct numpy arrays and regular python
lists.
The node function is called each time the node now()
is advanced and
the result is appended to the queue. The value of the node is the queue itself,
which should be regarded as immutable.
Below is an example that uses a queue to get a delayed value:
from mdf import evalnode, queuenode
@queuenode
def some_value_queue():
# do some calcuations
return result
@evalnode
def delayed_value():
values = some_value_queue() # type is collections.deque
if len(values) < 5:
return np.nan
# return the value calculated 4 timesteps ago
# (the item at -1 is the value for now)
return values[-5]
Queue nodes can be bounded so they don’t grow indefinetely. This is done by setting the size of the queue. Once the queue reaches that size older items will be popped off the queue. The size can be specified as either an integer value or as a callable object (e.g. function or node) which can be useful if the size if a function of another node. Once the queue is created the size is fixed for that context:
# keep at most 5 values
@queuenode(size=5)
def some_value_queue():
# do some calcuations
return result
#
# or calculate the size as a function (or node)
#
def get_queue_size():
return 5
@queuenode(size=get_queue_size)
def some_other_value_queue():
# do some calcuations
return result
Because queue nodes are a specialization of the eval node, they may also be filtered in the same way. If a filter is applied only when the filter returns True will values be calculated and appended to the queue.
Other Node Types¶
While eval nodes can be used to calculate any type of value, commonly used valuation types can be packaged as other node types for convenience. Currently the list of these specialized nodes is quite small, but as more use-cases are presented it’s reasonable to expect this list to grow.
Because these nodes are all specializations of the eval node, they may also be filtered in the same way. If a filter is applied only when the filter returns True will values be calculated or updated.
Delay Node¶
The delay node type is closely related to the queue node type. The
delaynode()
node type delays the value returned for a number of
timesteps that can be specified as the periods
parameter to that function:
from mdf import evalnode, delaynode
@delaynode(periods=10)
def a_delayed_value():
return some_value
@evalnode
def some_other_value():
x = a_delayed_value() # this is the valued returned by a_delayed_value as it
# was 10 timesteps ago
The value of the node before the number of periods has elapsed can be set using
the initial_value
parameter. The node’s value will be this until enough
timesteps have elapsed. By default the initial value is None
.
The function decorated with delaynode()
may be called when the node is
evaluated if it hasn’t already been called for the current timestep or if any
of its dependencies have changed. This can be a problem if attempting to set
up a recursive relationship such as:
@delaynode(periods=1, initial_value=0)
def delayed_a():
return a()
@evalnode
def a():
return 1 + delayed_a()
Even though the value for delayed_a()
should be available before
a()
is evaluated this still results in an infinite recursion
as evaluating delayed_a()
will result in a recursive call to
a()
.
To solve this problem delaynodes may optionally be lazily evaluated by
setting the lazy
kwarg to True:
@delaynode(periods=1, initial_value=0, lazy=True)
def delayed_a():
return a()
This is not the default because dependencies are discovered at run-time
and so delaying evaluation of a node will result in dependencies being added
in a later timestep that alter the structure of the DAG. When using
shifted contexts this can be a problem. If mdf
thinks that a node
can use a parent context of a shifted context, and then later the
dependencies change that break that assumption a
ConditionalDependencyError
will be thrown.
The way to fix a problem with conditional dependencies is to make them
unconditional. In the case of delayed nodes this can be done by making
the initial_value
an eval_node()
that has the same
dependencies (or at least the ones that are sensitive
to the shift) as the delayed node function.
NaN Sum Node¶
The nansumnode()
node type calculates the sum of the values returned by
its function as now()
is advanced. Values that are NaN are excluded
from the sum:
from mdf import evalnode, nansumnode
@nansumnode
def some_value():
return some_value
@evalnode
def sum_of_some_value():
value_sum = some_value() # this is the sum of 'some_value' for all time steps so far
Cumulative Product Node¶
The cumprodnode()
node type calculates the cumulative product of the
values returned by its function as now()
is advanced:
from mdf import evalnode, cumprodnode
@cumprodnode
def some_value():
return some_value
@evalnode
def sum_of_some_value():
value_prod = some_value() # this is the cumulative product of 'some_value'
# for all time steps so far
Apply Node¶
The applynode()
node type applies an arbitrary function to the value
returned by the node function. You can optionally supply additional args and kwargs
that will be passed in to the function; if any of these arguments are nodes then
they will be evaluated and the result will be passed in.
For example, to add the values of existing nodes A and B:
A_plus_B = A.apply(operator.add, args=(B,))
Or you can get the node:
A_plus_B_node = A.applynode(operator.add, args=(B,))
And then chain apply additional nodes to it, such as a cumulative product:
smoothed_A_plus_B = A_plus_B_node.cumprod(...)
NB: Unlike most other node types the applynode shouldn’t be used as a decorator, but instead should only be used via the method syntax for node types (see nodetype_method_syntax, below).
Method Syntax For Node Types¶
Creating a new node for simple operations on an existing node can make code look bloated and difficult to follow.
For this reason every node type is also exposed as methods on all other nodes. This is a syntactic helper and the end result is exactly the same as if a new node using the node type decorator was used.
This is best illustrated by example:
from mdf import evalnode, cumprod
from random import random
@evalnode
def random_value():
while True:
yield random()
If we wanted to compute the cumulative product of this random value you could do it by
creating a new node using the cumprod()
decorator:
@cumprodnode(half_life=10)
def cumulative_product_of_random_value():
return random_value()
But if there are many nodes this can become a bit awkward. Using the method syntax the same thing can be achieved as follows:
@evalnode
def some_other_node():
ewam_of_random_value = random_value.cumprod(half_life=10)
# do some more calculation
return result
When the cumprod method on the random_value node is called an internal node is created for that cumulative product calculation. Each subsequent time it’s called that internal node is re-used and so the effect is exactly the same as if the cumprod node was created explicitly.
All of the standard node types have corresponding methods, and custom node types can optionally expose themselves as methods.
In addition, there is also a method that returns the internal implicitly created node. This allows for chaining, e.g.:
@evalnode
def some_other_node():
ewam_of_random_value_node = random_value.cumprodnode(half_life=10)
delayed_cumprod = ewam_of_random_value_node.delay(periods=10, initial_value=0)
# do some more calculation
return result
Or more simply:
@evalnode
def some_other_node():
delayed_cumprod = random_value.cumprodnode(half_life=10).delay(periods=10, initial_value=0)
# do some more calculation
return result