Event-Driven automaton
Disclaimer: This article describes unobvious solution for inobvious problem. Before you
roast meimplement it, I recommend reading this article till the end and think twice.
Hi everyone! When we code, quite often we have to deal with a state. One of such situations is objects lifecycle. Manage an object with multiple possible states can be a really non-trivial task. Add here concurrent execution and you have a very serious problem. There is an effective and natural solution to this. In this article I’ll tell about event-driven finite-state machine and how to build it in Go.
Why to manage the state?
For a start, let’s define the state itself. The simplest example of state are files and connections. It’s not possible to just read the file. You have to open it before you start and you also can must close it after. It means that your current action relies on result of previous one: read depends on open. Saved result is the state.
The main problem of state is complexity. Any state in your code automatically complicates it. You have to persist the results of actions in memory and implement various checks in your logic. That’s why stateless architectures are so attractive to developers - nobody wants to deal with troubles complexity. If results of your actions don’t affect your logic, you don’t need the state.
However, there is one property that makes one reckon with difficulties. The state forces you to follow a specific order of actions. Overall, you should avoid such situations, but it’s not always possible. The example - lifecycle of objects in your program. Thanks to proper state management, it’s possible to get predictable behavior for objects with complex lifecycle.
Now let’s figure out how to make it cool.
Automaton as solution
When people talk about states, finite automata immediately pop in my mind. It is logical, because the automaton is the most natural way to control the state.
I won’t dive deep into automata theory. There’s plenty of information around the web.
If you try to look for examples of finite automata for Go, you will run into lexer by Rob Pike. Great example of automaton that uses processed data as its alphabet. It means that state transition is triggered by text, which lexer processes. Elegant solution to a specific problem.
The main thing you should understand - automaton is a solution to a strictly specific problem. So, before you consider it as a solution to all your problems, you should completely understand your case. More specific, the entity you want to manage:
- states - lifecycle;
- events - what exactly triggers transition into every state;
- results - output data;
- execution mode (sync/async);
- basic use cases.
Lexer is brilliant, but it changes state only because of data it processes. What can we do if state transition should be triggered by the user? This is where event-driven automaton can help out.
Real example
To make it clearer, I will analyze an example from the phono
library.
You can have a look at introduction article for full context. It’s not necessary but help understand better what we are managing.
And what do we manage?
The core of phono
is a DSP pipeline. It consists of three stages of processing. Every stage can have various number of components:
pipe.Pump
- mandatory receive sound stage, always only one per-pipe.pipe.Processor
- optional processing stage, from 0 to N components.pipe.Sink
- mandatory send stage, from 1 to N components.
The lifecycle of the pipeline is exactly what we are going to manage.
Lifecycle
Here’s the state graph of pipe.Pipe
.
Transitions caused by execution logic defined with italic. Event-caused transitions are written in bold. The diagram shows that states also could be of two kinds:
- idle state -
ready
andpaused
, exit only on event. - active state -
running
andpausing
, exit on event and execution logic.
Before we dive deep into code, I want to show an example of all states in action:
// PlayWav at passed path with default portaudio device.
func PlayWav(wavFile string) error {
bufferSize := phono.BufferSize(512) // buffer size for data transition.
w, err := wav.NewPump(wavFile, bufferSize) // new wav pump.
if err != nil {
return err
}
pa := portaudio.NewSink( // new portaudio sink.
bufferSize,
w.WavSampleRate(),
w.WavNumChannels(),
)
p := pipe.New( // new pipe.Pipe with ready initial state.
w.WavSampleRate(),
pipe.WithPump(w),
pipe.WithSinks(pa),
)
p.Run() // transition into running state.
errc := p.Pause() // transition into pausing state.
err = pipe.Wait(errc) // waiting for transition into paused state.
if err != nil {
return err
}
errc = p.Resume() // transition into running state.
err = pipe.Wait(errc) // waiting for transition into ready state.
if err != nil {
return err
}
return pipe.Wait(p.Close()) // free resources in the end.
}
Now we will walk through the implementation.
All sources are available at the repository.
States and events
Let’s begin with the essentials.
// state defines one of possible pipeline states.
type state interface {
listen(*Pipe, target) (state, target) // listen for new transition.
transition(*Pipe, eventMessage) (state, error) // transition function.
}
// idleState can exit on event only.
type idleState interface {
state
}
// activeState can exit on event and execution logic.
type activeState interface {
state
sendMessage(*Pipe) state // send new message.
}
// states types.
type (
idleReady struct{}
activeRunning struct{}
activePausing struct{}
idlePaused struct{}
)
// states variables.
var (
ready idleReady
running activeRunning
paused idlePaused
pausing activePausing
)
Thanks to state types, all transitions are defined separately per-state. It allows to avoid huge sausage transition function with nested switch
operator. States themselves doesn’t contain any data or logic. We can declare package-level variables to skip unnecessary allocations. state
interface is needed for polymorphism. I’ll tell more about activeState
and idleState
in a moment.
Another important part of automaton - events.
// event type.
type event int
// events constants.
const (
run event = iota
pause
resume
push
measure
cancel
)
// target defines end state for triggered event.
type target struct {
state idleState // target state.
errc chan error // channel with errors. closed when target state is reached.
}
// eventMessage is sent into automaton when user triggers event.
type eventMessage struct {
event // event type
params params // new params
components []string // id of components
target // target for this event
}
To understand why we need target
type, let’s consider a simple example. We create new pipeline. It’s in the ready
state. Now we start it with p.Run()
function. Event run
is sent into automaton and pipeline transitions to the running
state. How do we know when pipeline is done? Here comes target
type to the rescue. It tells which state we should wait after event. In our example, pipeline will transition to ready
state after the job is done. Example explained on diagram:
Now more details about state types. Precisely, about idleState
and activeState
c interfaces. Let’s have a look on listen(*Pipe, target) (state, target)
for different types:
// listen for transition from ready state.
func (s idleReady) listen(p *Pipe, t target) (state, target) {
return p.idle(s, t)
}
// listen for transition from listen.
func (s activeRunning) listen(p *Pipe, t target) (state, target) {
return p.active(s, t)
}
pipe.Pipe
has different methods for transition waiting! What’s there?
// idle waits for transition from idle state. Listens only the event state.
func (p *Pipe) idle(s idleState, t target) (state, target) {
if s == t.state || s == ready {
t = t.dismiss() // target reached, dismiss.
}
for {
var newState state
var err error
select {
case e := <-p.events: // waiting for event.
newState, err = s.transition(p, e) // call transition function.
if err != nil {
e.target.handle(err)
} else if e.hasTarget() {
t.dismiss()
t = e.target
}
}
if s != newState {
return newState, t // return, we have a new state.
}
}
}
// active waits for transition from active state. Listens events channel
// and channels needed for execution.
func (p *Pipe) active(s activeState, t target) (state, target) {
for {
var newState state
var err error
select {
case e := <-p.events: // wait for event.
newState, err = s.transition(p, e) // do the transition.
if err != nil { // successful transition?
e.target.handle(err) // no, send error.
} else if e.hasTarget() { // yes, have new target?
t.dismiss() // cancel current one.
t = e.target // replace with new.
}
case <-p.provide: // wait for new message request.
newState = s.sendMessage(p) // send new message.
case err, ok := <-p.errc: // wait for errors.
if ok {
interrupt(p.cancel) // interrupt execution.
t.handle(err) // send error.
}
return ready, t // go to ready state.
}
if s != newState {
return newState, t // exit if transition happened.
}
}
}
Thus, we can listen to different channels in different states. It allows us for instance, to not send messages during pause. We just ignore corresponding channel.
Constructor and automaton start
// New creates pipeline and starts automaton with ready as initial state.
func New(sampleRate phono.SampleRate, options ...Option) *Pipe {
p := &Pipe{
UID: phono.NewUID(),
sampleRate: sampleRate,
log: log.GetLogger(),
processors: make([]*processRunner, 0),
sinks: make([]*sinkRunner, 0),
metrics: make(map[string]measurable),
params: make(map[string][]phono.ParamFunc),
feedback: make(map[string][]phono.ParamFunc),
events: make(chan eventMessage, 1), // events channel.
cancel: make(chan struct{}), // cancellation channel.
provide: make(chan struct{}),
consume: make(chan message),
}
for _, option := range options { // apply options.
option(p)()
}
go p.loop() // start automaton.
return p
}
Apart from functional options here we also start automaton’s loop in a separate goroutine. Let’s have a look:
// loop is executed until nil state is returned.
func (p *Pipe) loop() {
var s state = ready // initial state
t := target{}
for s != nil {
s, t = s.listen(p, t) // wait for transition into a new state.
p.log.Debug(fmt.Sprintf("%v is %T", p, s))
}
t.dismiss() // dismiss last target.
close(p.events) // close events channel.
}
// listen waits for transition from ready state.
func (s idleReady) listen(p *Pipe, t target) (state, target) {
return p.idle(s, t)
}
// transition returns new state depending on triggered event.
func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) {
switch e.event {
case cancel:
interrupt(p.cancel)
return nil, nil
case push:
e.params.applyTo(p.ID())
p.params = p.params.merge(e.params)
return s, nil
case measure:
for _, id := range e.components {
e.params.applyTo(id)
}
return s, nil
case run:
if err := p.start(); err != nil {
return s, err
}
return running, nil
}
return s, ErrInvalidState
}
Pipeline is created and looking forward for events.
It’s time to work
Call p.Run()
!
// Run sends run event into pipeline.
// Execution of this method after pipe.Close causes panic.
func (p *Pipe) Run() chan error {
runEvent := eventMessage{
event: run,
target: target{
state: ready, // target idle state.
errc: make(chan error, 1),
},
}
p.events <- runEvent
return runEvent.target.errc
}
// listen waits for transition from running state.
func (s activeRunning) listen(p *Pipe, t target) (state, target) {
return p.active(s, t)
}
// transition returns new state depending on triggered event.
func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) {
switch e.event {
case cancel:
interrupt(p.cancel)
err := Wait(p.errc)
return nil, err
case measure:
e.params.applyTo(p.ID())
p.feedback = p.feedback.merge(e.params)
return s, nil
case push:
e.params.applyTo(p.ID())
p.params = p.params.merge(e.params)
return s, nil
case pause:
return pausing, nil
}
return s, ErrInvalidState
}
// sendMessage generates new message.
func (s activeRunning) sendMessage(p *Pipe) state {
p.consume <- p.newMessage()
return s
}
running
generates new messages and is executed until pipeline is done.
Time for a pause
During pipeline execution we can pause it. In this state pipe won’t generate new messages. For this we need to call p.Pause()
.
// Pause sends pause event into pipeline.
// Execution of this method after pipe.Close causes panic.
func (p *Pipe) Pause() chan error {
pauseEvent := eventMessage{
event: pause,
target: target{
state: paused, // target idle state.
errc: make(chan error, 1),
},
}
p.events <- pauseEvent
return pauseEvent.target.errc
}
// listen waits for transition from pausing state.
func (s activePausing) listen(p *Pipe, t target) (state, target) {
return p.active(s, t)
}
// transition returns new state depending on triggered event.
func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) {
switch e.event {
case cancel:
interrupt(p.cancel)
err := Wait(p.errc)
return nil, err
case measure:
e.params.applyTo(p.ID())
p.feedback = p.feedback.merge(e.params)
return s, nil
case push:
e.params.applyTo(p.ID())
p.params = p.params.merge(e.params)
return s, nil
}
return s, ErrInvalidState
}
// sendMessage generates new message. The message contains param function that is called
// when message is recieved. sendMessage blocks until all receivers get the message.
// Thus it's guaranteed that:
// 1. New messages doesn't generate
// 2. All components processed last message
func (s activePausing) sendMessage(p *Pipe) state {
m := p.newMessage()
if len(m.feedback) == 0 {
m.feedback = make(map[string][]phono.ParamFunc)
}
var wg sync.WaitGroup
wg.Add(len(p.sinks)) // add all sinks.
for _, sink := range p.sinks {
param := phono.ReceivedBy(&wg, sink.ID()) // param function.
m.feedback = m.feedback.add(param)
}
p.consume <- m // send message.
wg.Wait() // wait delivered.
return paused
}
When all receivers get the message pipeline will transit to the paused
state. If message is the last one, then pipe will go into ready
state.
Let’s work again!
To exit paused
state, we need to call p.Resume()
.
// Resume sends resume event into pipeline.
// Execution of this method after pipe.Close causes panic.
func (p *Pipe) Resume() chan error {
resumeEvent := eventMessage{
event: resume,
target: target{
state: ready, // target idle state.
errc: make(chan error, 1),
},
}
p.events <- resumeEvent
return resumeEvent.target.errc
}
// listen waits for transition from paused state.
func (s idlePaused) listen(p *Pipe, t target) (state, target) {
return p.idle(s, t)
}
// transition returns new state depending on triggered event.
func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) {
switch e.event {
case cancel:
interrupt(p.cancel)
err := Wait(p.errc)
return nil, err
case push:
e.params.applyTo(p.ID())
p.params = p.params.merge(e.params)
return s, nil
case measure:
for _, id := range e.components {
e.params.applyTo(id)
}
return s, nil
case resume:
return running, nil
}
return s, ErrInvalidState
}
Everything is trivial here. Pipeline again goes into running
state.
Wrap it up
Pipeline can be stopped during any state. There is p.Close()
method for this.
// Close sends cancel event into pipeline.
// Consequent execution of this method causes panic.
// Mandatory to call it to release resources.
func (p *Pipe) Close() chan error {
resumeEvent := eventMessage{
event: cancel,
target: target{
state: nil, // target idle state.
errc: make(chan error, 1),
},
}
p.events <- resumeEvent
return resumeEvent.target.errc
}
Who needs this?
Definitely not everyone. To understand how to manage the state it’s essential to understand your problem first. There are strictly two conditions when event-driven state machine can be utilized:
- Complex lifecycle - there are three and more states with non-linear transitions.
- Asynchronous execution.
Disregards event-driven automaton solves the problem, but it’s still quite complex abstraction. That’s why you should use it with maximum caution and only after complete understanding of its pros and cons.