Tutorial: Missing values

Tutorial: missing value handling

This tutorial illustrates the usage of Transducers.jl by stepping through various handling of missing values.

using Transducers

Dot product

Here is a simple way to compute a dot product using mapfoldl and MapSplat:

mapfoldl(MapSplat(*), +, zip(1:3, 10:2:14))

Let's see what it does step by step. First we create a "printer" transducer using the following function (see Map):

xf_printer(label) = Map() do x
    println(label, ": ", x)
    return x  # just return it as-is

This transducer just pass-through the input while printing its value (prefixed by a label). Let's sandwich the previous MapSplat(*) with it:

    xf_printer(" input") |> MapSplat(*) |> xf_printer("output"),
    +, zip(1:3, 10:2:14))
 input: (1, 10)
output: 10
 input: (2, 12)
output: 24
 input: (3, 14)
output: 42

You can see that the input tuple (1, 10) is splatted into function * by MapSplat which then outputs 10. This is repeated for all inputs.

Perhaps unfortunately, this way of computing a dot product propagates any missing values contained in the input arrays to the result (which may actually be desired in certain cases).

xs = [1, missing, 3, 2]
ys = [10, 14, missing, 12]
mapfoldl(MapSplat(*), +, zip(xs, ys))

However, it is very simple to ignore any missing values using OfType:

xf_mdot = OfType(Tuple{Vararg{Number}}) |> MapSplat(*)
mapfoldl(xf_mdot, +, zip(xs, ys))

Here, Tuple{Vararg{Number}} is a type that matches with a tuple of any length with numbers. It does not match with a tuple if it has a missing.

@assert (1, 0.5) isa Tuple{Vararg{Number}}
@assert (1, 0.5, 2im) isa Tuple{Vararg{Number}}
@assert !((1, missing) isa Tuple{Vararg{Number}})


Transducer xf_mdot above can also be used to compute the covariance. First, we need the number of pairs of elements in xs and ys that both of them are not missing:

nonmissings = mapfoldl(OfType(Tuple{Vararg{Number}}) |> Count(),
                       zip(xs, ys);
                       init = 0)

We do this by using Count and right. Count ignores input and count the number of times the input is provided. Since OfType(Tuple{Vararg{Number}}) provides the inputs to the downstream transducer only if there is no missing values, this correctly counts the number of non-missing pairs. Function right is simply defined as right(l, r) = r (and right(r) = r). Thus, the whole mapfoldl returns the last output of Count. In case Count never gets called (i.e., there are no non-missing pairs), we pass init=0.

mapfoldl(OfType(Tuple{Vararg{Number}}) |> Count(),
         zip(Int[], Int[]);
         init = 0)

Finally, we have to pre-process the input to xf_mdot by subtracting the average. It's easy to do with Map:

using Statistics: mean

function xf_demean(xs, ys)
    xmean = mean(skipmissing(xs))
    ymean = mean(skipmissing(ys))
    return Map(((x, y),) -> (x - xmean, y - ymean))

mapfoldl(xf_demean(xs, ys) |> xf_mdot, +, zip(xs, ys)) / nonmissings


How do we use transducers for vector-to-vector transformation? Here is a function to calculate $y = x + y$ while ignoring missing values in $x$. First, mandatory input shape check:

function add_skipmissing!(ys, xs)
    length(ys) == length(xs) || error("length(ys) != length(xs)")
    firstindex(ys) == 1 || error("firstindex(ys) != 1")

For filtering out missing values from xs while tracking indices, we use Enumerate and Filter. To iterate over the output of the transducer, foreach is used instead of mapfoldl since mutating an array is better expressed as a side-effect than a fold.

    foreach(Enumerate() |> Filter(!(ismissing ∘ last)), xs) do (i, xi)
        @inbounds ys[i] += xi

We then return the mutated value to behave like the rest of Julia functions (push!, mul!, etc.):

    return ys


add_skipmissing!([100, 110, 120], [1, missing, 2])
3-element Array{Int64,1}:

Vectorized reduction

foldl, mapfoldl, etc. in Base support dims argument. Transducers.jl does not support this argument as of writing. However, this can easily be emulated using eachcol, eachrow, or eachslice iterators in Base.

xs = [
    0       missing 1       2
    3       4       5       missing
    missing 6       7       missing

function xf_sum_columns(prototype)
    T = Base.nonmissingtype(eltype(prototype)) # subtract Missing from type
    dims = size(prototype)
    return Scan(add_skipmissing!, Initializer(_ -> zeros(T, dims)))

We use Initializer here to allocate the "output array" into which the columns are added by add_skipmissing!.

mapfoldl(xf_sum_columns(xs[:, 1]), right, eachcol(xs))
3-element Array{Int64,1}:

Above computation returns the sum over each row without taking into account the relationship within a column. Another possibly useful reduction is the sum of the columns with no missing values. This can easily be done by prepending a filter:

mapfoldl(Filter(x -> !any(ismissing, x)) |> xf_sum_columns(xs[:, 1]),
         right, eachcol(xs))
3-element Array{Int64,1}:

Note that above combination of Scan and right is redundant. For example, we can simply pass add_skipmissing! to "normal" foldl:

foldl(add_skipmissing!, eachcol(xs), init=zeros(Int, size(xs, 1)))
3-element Array{Int64,1}:

However, packaging it as a transducer is sometimes useful as it can be composed with other transducers and "bottom" reducing function. For example, vectorized version of cumsum can easily obtained by composing it with append! (and then reshape after mapfoldl):

result = mapfoldl(
    xf_sum_columns(xs[:, 1]),
    init = Int[])
reshape(result, (size(xs, 1), :))
3×4 Array{Int64,2}:
 0  0   1   3
 3  7  12  12
 0  6  13  13

Note that we need Completing here since append! does not have the unary method used for complete protocol.


Another useful operation to do ignoring missing values is argmax/argmin. It can be implemented using Enumerate() |> Filter(!(ismissing ∘ last)) (see also add_skipmissing! above) composed with ScanEmit. We first need to define a function to be called by ScanEmit:

#                     ,--- current state
#                     |
#                     |              ,-- input
#                     |              |
function argmax_step((argmax, max), (index, value))
    argmax, max = value > max ? (index, value) : (argmax, max)
    return argmax, (argmax, max)
    #       \        \
    #        \        \__ next state
    #         \
    #          \__ output

This function is passed to ScanEmit with the initial state:

xf_argmax = Enumerate() |> Filter(!(ismissing ∘ last)) |>
    ScanEmit(argmax_step, (0, typemin(Int)))
#                          |
#                   initial state

As ScanEmit is one of the most complex (and powerful) transducer, it may require some comments on how above code works:

We have the argmax function by extracting the last output of xf_argmax:

mapfoldl(xf_argmax, right, [1, 3, missing, 2])

Side note: We use typemin(Int) as the initial value of max for simplicity. In practice, it should be typemin(eltype(input_array)). See the next section for how to do it using Initializer.


Transducer xf_argmax in the previous section only outputs the index of the maximum element so far. To output the maximum element as well, we can simply use Scan. Also, while we are at it, let's support both argmax and argmin. To this end, we parametrize the function passed to Scan by the comparison function > and <. Following function argext_step takes the function > or < and return a function appropriate for Scan.

argext_step(should_update) =
    ((oldindex, oldvalue), (index, value)) ->
        should_update(oldvalue, value) ? (index, value) : (oldindex, oldvalue)

Another problem with xf_argmax is that it does not handle non-Int input types. To properly handle different input types, we use Initializer. Initializer needs a function that maps an input type to the initial state. We first define a helper function

init_helper(::typeof(>), ::Type{Tuple{F, S}}) where {F, S} = (zero(F), typemax(S))
init_helper(::typeof(<), ::Type{Tuple{F, S}}) where {F, S} = (zero(F), typemin(S))

...which is partially evaluated before passed to Initializer:

argext_init(should_update) = Initializer(TT -> init_helper(should_update, TT))

Finally we construct a Scan transducer using argext_step and argext_init:

xf_scanext(should_update) = Scan(argext_step(should_update),

Passing < gives us the argmax transducer:

    Enumerate() |> OfType(Tuple{Integer, Number}) |> xf_scanext(<),
    right, [1.0, 3.0, missing, 2.0])

We use OfType(Tuple{Integer, Number}) instead of Filter(!(ismissing ∘ last)) as typemin used in xf_scanext requires to know that the input type does not include missing.

We now have transducers xf_scanext(<) and xf_scanext(>) for argmax and argmin, respectively. We can compute them concurrently by Zip'ing them together:

xf_fullextrema = Enumerate() |> OfType(Tuple{Integer, Number}) |>
    Zip(xf_scanext(>), xf_scanext(<))

mapfoldl(xf_fullextrema, right, [1.0, 3.0, -1.0, missing, 2.0])
((3, -1.0), (2, 3.0))

This transducer produces a tuple ((argmin, min), (argmax, max)). To output only indices, append an appropriate Map:

xf_argextrema =
    xf_fullextrema |> Map() do ((argmin, min), (argmax, max))
        (argmin, argmax)

mapfoldl(xf_argextrema, right, [1.0, 3.0, -1.0, missing, 2.0])
(3, 2)

This page was generated using Literate.jl.