Event Latency
There is a built-in DelayQueue
if you need a store Store
with latency between put!
and take!
events. However here, we show you how you could have built one for yourself. If you modify this in order to construct a particularly useful type of latency store, please contribute it to the library through a pull request.
Description
In this example we show how to separate the time delay between processes from the processes themselves. We model a communications channel, called a Cable
, where a sender sends messages regularly each SEND_PERIOD
time units and a receiver listens each RECEIVE_PERIOD
. The messages in the cable have a delay of DELAY_DURATION
until they reach the receiver.
Load Packages
using ConcurrentSim
using ResumableFunctions
import Base: put!, take!
# output
Define Constants
const SIM_DURATION = 100.
const SEND_PERIOD = 5.0
const RECEIVE_PERIOD = 3.0;
nothing # hide
# output
Define Cable model
The Cable
contains reference to the simulation it is part of, the delay that messages experience, and a store that contains the sent messages
mutable struct Cable
env::Simulation
delay::Float64
store::Store{String}
function Cable(env::Simulation, delay::Float64)
return new(env, delay, Store{String}(env))
end
end;
nothing # hide
# output
The latency function is a generator which yields two events: first a timeout
that represents the transmission delay, then a put event when the message gets stored in the store.
@resumable function latency(env::Simulation, cable::Cable, value::String)
@yield timeout(cable.env, cable.delay)
@yield put!(cable.store, value)
end;
nothing # hide
# output
The put!
and take!
functions allow interaction with the cable (note that these are not @resumable
because they need to return the result of the operation and not the operation itself).
function put!(cable::Cable, value::String)
@process latency(cable.env, cable, value) # results in the scheduling of all events generated by latency
end
function take!(cable::Cable); output = false
take!(cable.store) # returns an element stored in the cable store
end;
nothing # hide
# output
The sender
and receiver
generators yield events to the simulator.
@resumable function sender(env::Simulation, cable::Cable)
while true
@yield timeout(env, SEND_PERIOD)
value = "sender sent this at $(now(env))"
put!(cable, value)
end
end
@resumable function receiver(env::Simulation, cable::Cable)
while true
@yield timeout(env, RECEIVE_PERIOD)
msg = @yield take!(cable)
println("Received this at $(now(env)) while $msg")
end
end;
nothing # hide
# output
Create simulation, register events, and run!
env = Simulation()
cable = Cable(env, 10.)
@process sender(env, cable)
@process receiver(env, cable)
run(env, SIM_DURATION)
# output
Received this at 15.0 while sender sent this at 5.0
Received this at 20.0 while sender sent this at 10.0
Received this at 25.0 while sender sent this at 15.0
Received this at 30.0 while sender sent this at 20.0
Received this at 35.0 while sender sent this at 25.0
Received this at 40.0 while sender sent this at 30.0
Received this at 45.0 while sender sent this at 35.0
Received this at 50.0 while sender sent this at 40.0
Received this at 55.0 while sender sent this at 45.0
Received this at 60.0 while sender sent this at 50.0
Received this at 65.0 while sender sent this at 55.0
Received this at 70.0 while sender sent this at 60.0
Received this at 75.0 while sender sent this at 65.0
Received this at 80.0 while sender sent this at 70.0
Received this at 85.0 while sender sent this at 75.0
Received this at 90.0 while sender sent this at 80.0
Received this at 95.0 while sender sent this at 85.0