Some Applications

Sampling from Data on Disk

Suppose we want to sample from large datasets stored on disk. StreamSampling.jl is very suited for this task. Let's simulate this task by generating some data in HDF5 and arrow formats and batch sampling them. You will need 20GB of space on disk for running this example. If not available you can set a smaller size for totaltpl.

We first generate the datasets and store them with

using StreamSampling, Random, ChunkSplitters
using HDF5, Arrow

const dtype = @NamedTuple{a::Float64, b::Float64, c::Float64, d::Float64}
const totaltpl = 10^10÷32
const chunktpl = 5*10^5
const numchunks = ceil(Int, totaltpl / chunktpl)

function generate_file(filename, format)
    if format == :hdf5
        h5open(filename, "w") do file
            dset = create_dataset(file, "data", dtype, (totaltpl,), chunk=(chunktpl,))
            Threads.@threads for i in 1:numchunks
                starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
                dset[starttpl:endtpl] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()), 
                                            1:endtpl-starttpl+1)
            end
        end
    elseif format == :arrow
        for i in 1:numchunks
            starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
            Arrow.append("random_data.arrow", (data=map(i -> (a=rand(), b=rand(), c=rand(), d=rand()), 
                                               1:endtpl-starttpl+1),);file=false)
        end
    end
end

!isfile("random_data.h5") && generate_file("random_data.h5", :hdf5)
!isfile("random_data.arrow") && generate_file("random_data.arrow", :arrow)

Then we can sample them using 1 thread with

function sample_file(filename, rng, n, alg, format)
    rs = ReservoirSampler{dtype}(rng, n, alg)
    if format == :hdf5
        h5open(filename, "r") do file
            dset = file["data"]
            for i in 1:numchunks
                starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
                data_chunk = dset[starttpl:endtpl]
                for d in data_chunk
                    fit!(rs, d)
                end
            end
        end
    elseif format == :arrow
        rs = ReservoirSampler{dtype}(rng, n, alg)
        data = Arrow.Table(filename).data
        @inbounds for i in 1:length(data)
            fit!(rs, data[i])
        end
    end
    return rs
end

rng = Xoshiro(42)
@time rs = sample_file("random_data.h5", rng, 10^7, AlgRSWRSKIP(), :hdf5)
 43.514238 seconds (937.21 M allocations: 42.502 GiB, 2.57% gc time)
@time rs = sample_file("random_data.arrow", rng, 10^7, AlgRSWRSKIP(), :arrow)
 38.635389 seconds (1.25 G allocations: 33.500 GiB, 3.52% gc time, 75763 lock conflicts)

We can try to improve the performance by using multiple threads. Here, I started Julia with julia -t4 --gcthreads=4,1 on my machine

function psample_file(filename, rngs, n, alg, format)
    rsv = [ReservoirSampler{dtype}(rngs[i], n, alg) for i in 1:Threads.nthreads()]
    if format == :hdf5
        h5open(filename, "r") do file
            dset = file["data"]
            for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads()))
                Threads.@threads for (j, i) in collect(enumerate(c))
                    starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
                    data_chunk, rs = dset[starttpl:endtpl], rsv[j]
                    for d in data_chunk
                        fit!(rs, d)
                    end
                end
            end
        end
    elseif format == :arrow
        data = Arrow.Table(filename).data
        Threads.@threads for (i,c) in enumerate(chunks(1:length(data), n=Threads.nthreads()))
            @inbounds for j in c
                fit!(rsv[i], data[j])
            end
        end
    end
    return merge(rsv...)
end

rngs = [Xoshiro(i) for i in 1:Threads.nthreads()]
@time rs = psample_file("random_data.h5", rngs, 10^7, AlgRSWRSKIP(), :hdf5)
 23.240628 seconds (937.23 M allocations: 45.185 GiB, 9.52% gc time, 9375 lock conflicts)
@time rs = psample_file("random_data.arrow", rngs, 10^7, AlgRSWRSKIP(), :arrow)
 5.868995 seconds (175.91 k allocations: 3.288 GiB, 6.44% gc time, 64714 lock conflicts)

As you can see, the speed-up is not linear in the number of threads for an hdf5 file. This is mainly due to the fact that accessing the chunks is single-threaded, so one would need to use MPI.jl as explained at HDF5.jl/stable/mpi/ to improve the multi-threading performance. Though, we are already sampling at 500MB/s, which is not bad! Using Arrow.jl gives an even better performance, and a scalability which is better than linear somehow, reaching a 2GB/s sampling speed!

Monitoring

Suppose to receive data about some process in the form of a stream and you want to detect if anything is going wrong in the data being received. A reservoir sampling approach could be useful to evaluate properties on the data stream. This is a demonstration of such a use case using StreamSampling.jl. We will assume that the monitored statistic in this case is the mean of the data, and you want that to be lower than a certain threshold otherwise some malfunctioning is expected

using StreamSampling, Statistics, Random

function monitor(stream, thr)
    rng = Xoshiro(42)
    # we use a reservoir sample of 10^4 elements
    rs = ReservoirSampler{Int}(rng, 10^4)
    # we loop over the stream and fit the data in the reservoir
    for (i, e) in enumerate(stream)
        fit!(rs, e)
        # we check the mean value every 1000 iterations
        if iszero(mod(i, 1000)) && mean(value(rs)) >= thr
            return rs
        end
    end
end

We use some toy data for illustration

stream = 1:10^8; # the data stream
thr = 2*10^7; # the threshold for the mean monitoring

Then, we run the monitoring

rs = monitor(stream, thr);

The number of observations until the detection is triggered is given by

nobs(rs)

which is very close to the true value of 4*10^7 - 1 observations.

Note that in this case we could use an online mean methods, instead of holding all the sample into memory. However, the approach with the sample is more general because it allows to estimate any statistic about the stream.