Event-Driven automaton

Disclaimer: This article describes unobvious solution for inobvious problem. Before you roast me implement it, I recommend reading this article till the end and think twice.

Hi everyone! When we code, quite often we have to deal with a state. One of such situations is objects lifecycle. Manage an object with multiple possible states can be a really non-trivial task. Add here concurrent execution and you have a very serious problem. There is an effective and natural solution to this. In this article I’ll tell about event-driven finite-state machine and how to build it in Go.

Why to manage the state?

For a start, let’s define the state itself. The simplest example of state are files and connections. It’s not possible to just read the file. You have to open it before you start and you also can must close it after. It means that your current action relies on result of previous one: read depends on open. Saved result is the state.

The main problem of state is complexity. Any state in your code automatically complicates it. You have to persist the results of actions in memory and implement various checks in your logic. That’s why stateless architectures are so attractive to developers - nobody wants to deal with troubles complexity. If results of your actions don’t affect your logic, you don’t need the state.

However, there is one property that makes one reckon with difficulties. The state forces you to follow a specific order of actions. Overall, you should avoid such situations, but it’s not always possible. The example - lifecycle of objects in your program. Thanks to proper state management, it’s possible to get predictable behavior for objects with complex lifecycle.

Now let’s figure out how to make it cool.

Automaton as solution

When people talk about states, finite automata immediately pop in my mind. It is logical, because the automaton is the most natural way to control the state.

I won’t dive deep into automata theory. There’s plenty of information around the web.

If you try to look for examples of finite automata for Go, you will run into lexer by Rob Pike. Great example of automaton that uses processed data as its alphabet. It means that state transition is triggered by text, which lexer processes. Elegant solution to a specific problem.

The main thing you should understand - automaton is a solution to a strictly specific problem. So, before you consider it as a solution to all your problems, you should completely understand your case. More specific, the entity you want to manage:

Lexer is brilliant, but it changes state only because of data it processes. What can we do if state transition should be triggered by the user? This is where event-driven automaton can help out.

Real example

To make it clearer, I will analyze an example from the phono library.

You can have a look at introduction article for full context. It’s not necessary but help understand better what we are managing.

And what do we manage?

The core of phono is a DSP pipeline. It consists of three stages of processing. Every stage can have various number of components:

  1. pipe.Pump - mandatory receive sound stage, always only one per-pipe.
  2. pipe.Processor - optional processing stage, from 0 to N components.
  3. pipe.Sink - mandatory send stage, from 1 to N components.

The lifecycle of the pipeline is exactly what we are going to manage.

Lifecycle

Here’s the state graph of pipe.Pipe.

Transitions caused by execution logic defined with italic. Event-caused transitions are written in bold. The diagram shows that states also could be of two kinds:

Before we dive deep into code, I want to show an example of all states in action:

// PlayWav at passed path with default portaudio device.
func PlayWav(wavFile string) error {
	bufferSize := phono.BufferSize(512) // buffer size for data transition.

	w, err := wav.NewPump(wavFile, bufferSize) // new wav pump.
	if err != nil {
		return err
	}
	pa := portaudio.NewSink( // new portaudio sink.
		bufferSize,
		w.WavSampleRate(),
		w.WavNumChannels(),
	)
	p := pipe.New(          // new pipe.Pipe with ready initial state.
		w.WavSampleRate(),
		pipe.WithPump(w),
		pipe.WithSinks(pa),
	)

	p.Run()                 // transition into running state.

	errc := p.Pause()       // transition into pausing state.

	err = pipe.Wait(errc)   // waiting for transition into paused state.
	if err != nil {
		return err
	}

	errc = p.Resume()       // transition into running state.

	err = pipe.Wait(errc)   // waiting for transition into ready state.
	if err != nil {
		return err
	}

	return pipe.Wait(p.Close()) // free resources in the end.
}

Now we will walk through the implementation.

All sources are available at the repository.

States and events

Let’s begin with the essentials.

// state defines one of possible pipeline states.
type state interface {
	listen(*Pipe, target) (state, target)           // listen for new transition.
	transition(*Pipe, eventMessage) (state, error)  // transition function.
}

// idleState can exit on event only.
type idleState interface {
	state
}

// activeState can exit on event and execution logic.
type activeState interface {
	state
	sendMessage(*Pipe) state    // send new message.
}

// states types.
type (
	idleReady     struct{}
	activeRunning struct{}
	activePausing struct{}
	idlePaused    struct{}
)

// states variables.
var (
	ready   idleReady
	running activeRunning
	paused  idlePaused
	pausing activePausing
)

Thanks to state types, all transitions are defined separately per-state. It allows to avoid huge sausage transition function with nested switch operator. States themselves doesn’t contain any data or logic. We can declare package-level variables to skip unnecessary allocations. state interface is needed for polymorphism. I’ll tell more about activeState and idleState in a moment.

Another important part of automaton - events.

// event type.
type event int

// events constants.
const (
	run event = iota
	pause
	resume
	push
	measure
	cancel
)

// target defines end state for triggered event.
type target struct {
	state idleState  // target state.
	errc  chan error // channel with errors. closed when target state is reached.
}

// eventMessage is sent into automaton when user triggers event.
type eventMessage struct {
	event               // event type
	params    params    // new params
	components []string // id of components
	target              // target for this event
}

To understand why we need target type, let’s consider a simple example. We create new pipeline. It’s in the ready state. Now we start it with p.Run() function. Event run is sent into automaton and pipeline transitions to the running state. How do we know when pipeline is done? Here comes target type to the rescue. It tells which state we should wait after event. In our example, pipeline will transition to ready state after the job is done. Example explained on diagram:

Now more details about state types. Precisely, about idleState and activeStatec interfaces. Let’s have a look on listen(*Pipe, target) (state, target) for different types:

// listen for transition from ready state.
func (s idleReady) listen(p *Pipe, t target) (state, target) {
	return p.idle(s, t)
}

// listen for transition from listen.
func (s activeRunning) listen(p *Pipe, t target) (state, target) {
	return p.active(s, t)
}

pipe.Pipe has different methods for transition waiting! What’s there?

// idle waits for transition from idle state. Listens only the event state.
func (p *Pipe) idle(s idleState, t target) (state, target) {
	if s == t.state || s == ready {
		t = t.dismiss()         // target reached, dismiss.
	}
	for {
		var newState state
		var err error
		select {
		case e := <-p.events:                   // waiting for event.
			newState, err = s.transition(p, e)  // call transition function.
			if err != nil {
				e.target.handle(err)
			} else if e.hasTarget() {
				t.dismiss()
				t = e.target
			}
		}
		if s != newState {
			return newState, t  // return, we have  a new state.
		}
	}
}

// active waits for transition from active state. Listens events channel
// and channels needed for execution.
func (p *Pipe) active(s activeState, t target) (state, target) {
	for {
		var newState state
		var err error
		select {
		case e := <-p.events:                   // wait for event.
			newState, err = s.transition(p, e)  // do the transition.
			if err != nil {						// successful transition?
				e.target.handle(err)			// no, send error.
			} else if e.hasTarget() {			// yes, have new target?
				t.dismiss()						// cancel current one.
				t = e.target					// replace with new.
			}
		case <-p.provide:                       // wait for new message request.
			newState = s.sendMessage(p)         // send new message.
		case err, ok := <-p.errc:               // wait for errors.
			if ok {
				interrupt(p.cancel)             // interrupt execution.
				t.handle(err)                   // send error.
			}
			return ready, t                     // go to ready state.
		}
		if s != newState {
			return newState, t  // exit if transition happened.
		}
	}
}

Thus, we can listen to different channels in different states. It allows us for instance, to not send messages during pause. We just ignore corresponding channel.

Constructor and automaton start

// New creates pipeline and starts automaton with ready as initial state.
func New(sampleRate phono.SampleRate, options ...Option) *Pipe {
	p := &Pipe{
		UID:        phono.NewUID(),
		sampleRate: sampleRate,
		log:        log.GetLogger(),
		processors: make([]*processRunner, 0),
		sinks:      make([]*sinkRunner, 0),
		metrics:    make(map[string]measurable),
		params:     make(map[string][]phono.ParamFunc),
		feedback:   make(map[string][]phono.ParamFunc),
		events:     make(chan eventMessage, 1), // events channel.
		cancel:     make(chan struct{}),        // cancellation channel.
		provide:    make(chan struct{}),
		consume:    make(chan message),
	}
	for _, option := range options {            // apply options.
		option(p)()
	}
	go p.loop()                                 // start automaton.
	return p
}

Apart from functional options here we also start automaton’s loop in a separate goroutine. Let’s have a look:

// loop is executed until nil state is returned.
func (p *Pipe) loop() {
	var s state = ready         // initial state
	t := target{}
	for s != nil {
		s, t = s.listen(p, t)   // wait for transition into a new state.
		p.log.Debug(fmt.Sprintf("%v is %T", p, s))
	}
	t.dismiss()					// dismiss last target.
	close(p.events)             // close events channel.
}

// listen waits for transition from ready state.
func (s idleReady) listen(p *Pipe, t target) (state, target) {
	return p.idle(s, t)
}

// transition returns new state depending on triggered event.
func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) {
	switch e.event {
	case cancel:
		interrupt(p.cancel)
		return nil, nil
	case push:
		e.params.applyTo(p.ID())
		p.params = p.params.merge(e.params)
		return s, nil
	case measure:
		for _, id := range e.components {
			e.params.applyTo(id)
		}
		return s, nil
	case run:
		if err := p.start(); err != nil {
			return s, err
		}
		return running, nil
	}
	return s, ErrInvalidState
}

Pipeline is created and looking forward for events.

It’s time to work

Call p.Run()!

// Run sends run event into pipeline.
// Execution of this method after pipe.Close causes panic.
func (p *Pipe) Run() chan error {
	runEvent := eventMessage{
		event: run,
		target: target{
			state: ready,               // target idle state.
			errc:  make(chan error, 1),
		},
	}
	p.events <- runEvent
	return runEvent.target.errc
}


// listen waits for transition from running state.
func (s activeRunning) listen(p *Pipe, t target) (state, target) {
	return p.active(s, t)
}

// transition returns new state depending on triggered event.
func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) {
	switch e.event {
	case cancel:
		interrupt(p.cancel)
		err := Wait(p.errc)
		return nil, err
	case measure:
		e.params.applyTo(p.ID())
		p.feedback = p.feedback.merge(e.params)
		return s, nil
	case push:
		e.params.applyTo(p.ID())
		p.params = p.params.merge(e.params)
		return s, nil
	case pause:
		return pausing, nil
	}
	return s, ErrInvalidState
}

// sendMessage generates new message.
func (s activeRunning) sendMessage(p *Pipe) state {
	p.consume <- p.newMessage()
	return s
}

running generates new messages and is executed until pipeline is done.

Time for a pause

During pipeline execution we can pause it. In this state pipe won’t generate new messages. For this we need to call p.Pause().

// Pause sends pause event into pipeline.
// Execution of this method after pipe.Close causes panic.
func (p *Pipe) Pause() chan error {
	pauseEvent := eventMessage{
		event: pause,
		target: target{
			state: paused,              // target idle state.
			errc:  make(chan error, 1),
		},
	}
	p.events <- pauseEvent
	return pauseEvent.target.errc
}

// listen waits for transition from pausing state.
func (s activePausing) listen(p *Pipe, t target) (state, target) {
	return p.active(s, t)
}

// transition returns new state depending on triggered event.
func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) {
	switch e.event {
	case cancel:
		interrupt(p.cancel)
		err := Wait(p.errc)
		return nil, err
	case measure:
		e.params.applyTo(p.ID())
		p.feedback = p.feedback.merge(e.params)
		return s, nil
	case push:
		e.params.applyTo(p.ID())
		p.params = p.params.merge(e.params)
		return s, nil
	}
	return s, ErrInvalidState
}

// sendMessage generates new message. The message contains param function that is called
// when message is recieved. sendMessage blocks until all receivers get the message.
// Thus it's guaranteed that:
//  1. New messages doesn't generate
//  2. All components processed last message
func (s activePausing) sendMessage(p *Pipe) state {
	m := p.newMessage()
	if len(m.feedback) == 0 {
		m.feedback = make(map[string][]phono.ParamFunc)
	}
	var wg sync.WaitGroup
	wg.Add(len(p.sinks))    // add all sinks.
	for _, sink := range p.sinks {
		param := phono.ReceivedBy(&wg, sink.ID())   // param function.
		m.feedback = m.feedback.add(param)
	}
	p.consume <- m          // send message.
	wg.Wait()               // wait delivered.
	return paused
}

When all receivers get the message pipeline will transit to the paused state. If message is the last one, then pipe will go into ready state.

Let’s work again!

To exit paused state, we need to call p.Resume().

// Resume sends resume event into pipeline.
// Execution of this method after pipe.Close causes panic.
func (p *Pipe) Resume() chan error {
	resumeEvent := eventMessage{
		event: resume,
		target: target{
			state: ready,              // target idle state.
			errc:  make(chan error, 1),
		},
	}
	p.events <- resumeEvent
	return resumeEvent.target.errc
}

// listen waits for transition from paused state.
func (s idlePaused) listen(p *Pipe, t target) (state, target) {
	return p.idle(s, t)
}

// transition returns new state depending on triggered event.
func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) {
	switch e.event {
	case cancel:
		interrupt(p.cancel)
		err := Wait(p.errc)
		return nil, err
	case push:
		e.params.applyTo(p.ID())
		p.params = p.params.merge(e.params)
		return s, nil
	case measure:
		for _, id := range e.components {
			e.params.applyTo(id)
		}
		return s, nil
	case resume:
		return running, nil
	}
	return s, ErrInvalidState
}

Everything is trivial here. Pipeline again goes into running state.

Wrap it up

Pipeline can be stopped during any state. There is p.Close() method for this.

// Close sends cancel event into pipeline.
// Consequent execution of this method causes panic.
// Mandatory to call it to release resources.
func (p *Pipe) Close() chan error {
	resumeEvent := eventMessage{
		event: cancel,
		target: target{
			state: nil,                 // target idle state.
			errc:  make(chan error, 1),
		},
	}
	p.events <- resumeEvent
	return resumeEvent.target.errc
}

Who needs this?

Definitely not everyone. To understand how to manage the state it’s essential to understand your problem first. There are strictly two conditions when event-driven state machine can be utilized:

  1. Complex lifecycle - there are three and more states with non-linear transitions.
  2. Asynchronous execution.

Disregards event-driven automaton solves the problem, but it’s still quite complex abstraction. That’s why you should use it with maximum caution and only after complete understanding of its pros and cons.