ThreadsX.jl
ThreadsX
— ModuleThreads⨉: Parallelized Base
functions
tl;dr
Add prefix ThreadsX.
to functions from Base
to get some speedup, if supported. Example:
julia> using ThreadsX
julia> ThreadsX.sum(gcd(42, i) == 1 for i in 1:10_000)
2857
To find out functions supported by ThreadsX.jl, just type ThreadsX.
+ TAB in the REPL:
julia> ThreadsX.
MergeSort any findfirst map! reduce
QuickSort collect findlast mapreduce sort
Set count foreach maximum sort!
StableQuickSort extrema issorted minimum sum
all findall map prod unique
Interoperability
Rich collection support
The reduce
-based functions support any collections that implement SplittablesBase.jl
interface including arrays, Dict
, Set
, and iterator transformations. In particular, these functions support iterator comprehension:
julia> ThreadsX.sum(y for x in 1:10 if isodd(x) for y in 1:x^2)
4917
For advanced usage, they also support Transducers.eduction
constructed with parallelizable transducers.
OnlineStats.jl
ThreadsX.reduce
supports an OnlineStat
from OnlineStats.jl as the first argument as long as it implements the merging interface:
julia> using OnlineStats: Mean
julia> ThreadsX.reduce(Mean(), 1:10)
Mean: n=10 | value=5.5
API
ThreadsX.jl is aiming at providing API compatible with Base
functions to easily parallelize Julia programs.
All functions that exist directly under ThreadsX
namespace are public API and they implement a subset of API provided by Base
. Everything inside ThreadsX.Implementations
is implementation detail. The public API functions of ThreadsX
expect that the data structure and function(s) passed as argument are "thread-friendly" in the sense that operating on distinct elements in the given container from multiple tasks in parallel is safe. For example, ThreadsX.sum(f, array)
assumes that executing f(::eltype(array))
and accessing elements as in array[i]
from multiple threads is safe. In particular, this is the case if array
is a Vector
of immutable objects and f
is a pure function in the sense it does not mutate any global objects. Note that it is not required and not recommended to use "thread-safe" array that protects accessing array[i]
by a lock.
In addition to the Base
API, all functions accept keyword argument basesize::Integer
to configure the number of elements processed by each thread. A large value is useful for minimizing the overhead of using multiple threads. A small value is useful for load balancing when the time to process single item varies a lot from item to item. The default value of basesize
for each function is currently an implementation detail.
ThreadsX.jl API is deterministic in the sense that the same input produces the same output, independent of how julia
's task scheduler decide to execute the tasks. However, note that basesize
is a part of the input which may be set based on Threads.nthreads()
. To make the result of the computation independent of Threads.nthreads()
value, basesize
must be specified explicitly.
Limitations
- Keyword argument
dims
is not supported yet. - (There are probably more.)
Implementations
Most of reduce
-based functions are implemented as thin wrappers of Transducers.jl
.
Custom collections can support ThreadsX.jl API by implementing SplittablesBase.jl
interface.
ThreadsX.foreach
— FunctionThreadsX.foreach(f, collections...; basesize, simd)
A parallel version of
for args in zip(collections...)
f(args...)
end
ThreadsX.foreach
uses linear and Cartesian indexing of arrays
appropriately. However, it is likely very slow for sparse arrays.
Although ThreadsX.foreach
can be nested, it is highly recommended to use CartesianIndices
or Iterators.product
whenever applicable so that ThreadsX.foreach
can load-balance across multiple levels of loops. Otherwise (when nesting ThreadsX.foreach
) it is important to set basesize
for outer loops to small values (e.g., basesize = 1
).
Keyword Arguments
basesize
: The size of base case.simd
:false
,true
,:ivdep
, orVal
of one of them. Iftrue
/:ivdep
, the inner-most loop of each base case is annotated by@simd
/@simd ivdep
. This does not occur iffalse
(default).
Examples
julia> using ThreadsX
julia> xs = 1:10; ys = similar(xs);
julia> ThreadsX.foreach(eachindex(ys, xs)) do I
@inbounds ys[I] = xs[I]
end
As foreach
can only be used for side-effects, it is likely that it has to be used with eachindex
.
To avoid cumbersome indexing, a powerful pattern is to use Referenceables.jl with foreach
:
julia> using Referenceables # exports `referenceable`
julia> ThreadsX.foreach(referenceable(ys), xs) do y, x
y[] = x
end
Note that y[]
does not have to be marked by @inbounds
as it is ensured to be the reference to the valid location in the array.
Above function can also be written using map!
. foreach
is useful when, e.g., there are multiple outputs:
julia> A = randn(10, 10); sums = similar(A); muls = similar(A);
julia> ThreadsX.foreach(referenceable(sums), referenceable(muls), A, A') do s, m, x, y
s[] = x + y
m[] = x * y
end
Above code fuses the computation of sums .= A .+ A'
and muls .= A .* A'
and runs it in parallel.
foreach
can also be used when the array is both input and output:
julia> ThreadsX.foreach(referenceable(A)) do x
x[] *= 2
end
Nested loops can be written using Iterators.product
:
julia> A = 1:3
B = 1:2
C = zeros(3, 2);
julia> ThreadsX.foreach(referenceable(C), Iterators.product(A, B)) do c, (a, b)
c[] = a * b
end
@assert C == A .* reshape(B, 1, :)
This is equivalent to the following sequential code
julia> for j in eachindex(B), i in eachindex(A)
@inbounds C[i, j] = A[i] * B[j]
end
@assert C == A .* reshape(B, 1, :)
This loop can be expressed also with explicit indexing (which is closer to the sequential code):
julia> ThreadsX.foreach(Iterators.product(eachindex(A), eachindex(B))) do (i, j)
@inbounds C[i, j] = A[i] * B[j]
end
@assert C == A .* reshape(B, 1, :)
julia> ThreadsX.foreach(CartesianIndices(C)) do I
@inbounds C[I] = A[I[1]] * B[I[2]]
end
@assert C == A .* reshape(B, 1, :)
Note the difference in the ordering in the syntax; i.e., for j in eachindex(B), i in eachindex(A)
and Iterators.product(eachindex(A), eachindex(B))
. These are equivalent in the sense eachindex(A)
is the inner most loop in both cases.
ThreadsX.map
— FunctionThreadsX.mapi(f, iterators...; basesize)
Parallelized map(f, iterators...)
. Input collections iterators
must support SplittablesBase.halve
ThreadsX.mapi
— FunctionThreadsX.mapi(f, iterators...; basesize, ntasks)
Parallelized map(f, iterators...)
that works with purely sequential iterators
.
Note that calls to iterate
on iterators
are not parallelized. Only f
may be called in parallel. See also Transducers.NondeterministicThreading
for more information.
Currently, the default basesize
is 1. However, it may be changed in the future (e.g. it may be automatically tuned at run-time).
Keyword Arguments
basesize::Integer
: The number of input elements to be accumulated in a buffer before sent to a task.ntasks::Integer
: The number of tasks@spawn
ed. The default value isThreads.nthreads()
. A number larger thanThreads.nthreads()
may be useful if the inner reducing function contains I/O and does not consume too much resource (e.g., memory).
ThreadsX.map!
— FunctionThreadsX.map!(f, dest, inputs...; basesize, simd)
Parallelized map!
. See also foreach
.
Limitations
Note that the behavior is undefined when using dest
whose distinct indices refer to the same memory location. In particular:
SubArray
with overlapping indices. For example,view(zeros(2), [1, 1, 2, 2])
is unsupported butview(zeros(10), [1, 5, 4, 7])
is safe to use.BitArray
(currently unsupported)
ThreadsX.sort!
— FunctionThreadsX.sort!(xs; [smallsort, smallsize, basesize, alg, lt, by, rev, order])
Sort a vector xs
in parallel.
Examples
julia> using ThreadsX
julia> ThreadsX.sort!([9, 5, 2, 0, 1])
5-element Array{Int64,1}:
0
1
2
5
9
julia> ThreadsX.sort!([0:5;]; alg = ThreadsX.StableQuickSort, by = _ -> 1)
6-element Array{Int64,1}:
0
1
2
3
4
5
It is also possible to use Base.sort!
directly by specifying alg
to be one of the parallel sort algorithms provided by ThreadsX:
julia> sort!([9, 5, 2, 0, 1]; alg = ThreadsX.MergeSort)
5-element Array{Int64,1}:
0
1
2
5
9
This entry point may be slower than ThreadsX.sort!
if the input is a very large array of integers with small range. In this case, ThreadsX.sort!
uses parallel counting sort whereas sort!
uses sequential counting sort.
Keyword Arguments
alg :: Base.Sort.Algorithm
:ThreadsX.MergeSort
,ThreadsX.QuickSort
,ThreadsX.StableQuickSort
etc.Base.MergeSort
andBase.QuickSort
can be used as aliases ofThreadsX.MergeSort
andThreadsX.QuickSort
.smallsort :: Union{Nothing,Base.Sort.Algorithm}
: The algorithm to use for sorting small chunk of the input array.smallsize :: Union{Nothing,Integer}
: Size of array under whichsmallsort
algorithm is used.nothing
(default) means to usebasesize
.basesize :: Union{Nothing,Integer}
. Granularity of parallelization.nothing
(default) means to choose the default size.- For keyword arguments, see
Base.sort!
.
ThreadsX.sort
— FunctionThreadsX.sort(xs; [smallsort, smallsize, basesize, alg, lt, by, rev, order])
See also ThreadsX.sort!
.
ThreadsX.MergeSort
— ConstantThreadsX.MergeSort
Parallel merge sort algorithm.
See also ThreadsX.QuickSort
.
Examples
ThreadsX.MergeSort
is a Base.Sort.Algorithm
, just like Base.MergeSort
. It has a few properties for configuring the algorithm.
julia> using ThreadsX
julia> ThreadsX.MergeSort isa Base.Sort.Algorithm
true
julia> ThreadsX.MergeSort.smallsort === Base.Sort.DEFAULT_STABLE
true
The properties can be "set" by calling the algorithm object itself. A new algorithm object with new properties given by the keyword arguments is returned:
julia> alg = ThreadsX.MergeSort(smallsort = QuickSort) :: Base.Sort.Algorithm;
julia> alg.smallsort == QuickSort
true
julia> alg2 = alg(basesize = 64, smallsort = InsertionSort);
julia> alg2.basesize
64
julia> alg2.smallsort === InsertionSort
true
Properties
smallsort :: Base.Sort.Algorithm
: Default toBase.Sort.DEFAULT_STABLE
.smallsize :: Union{Nothing,Integer}
: Size of array under whichsmallsort
algorithm is used.nothing
(default) means to usebasesize
.basesize :: Union{Nothing,Integer}
. Base case size of parallel merge.nothing
(default) means to choose the default size.
ThreadsX.QuickSort
— ConstantThreadsX.QuickSort
ThreadsX.StableQuickSort
Parallel quick sort algorithms.
See also ThreadsX.MergeSort
.
Properties
smallsort :: Base.Sort.Algorithm
: Default toBase.Sort.DEFAULT_UNSTABLE
.smallsize :: Union{Nothing,Integer}
: Size of array under whichsmallsort
algorithm is used.nothing
(default) means to usebasesize
.basesize :: Union{Nothing,Integer}
. Granularity of parallelization.nothing
(default) means to choose the default size.