Building (μ)services in Go

applying TDD, the Hexagonal Architexture, DDD and CQS

29 January 2020

Gonzalo Serrano Revuelta

Lead SWE at Paack

$ whoami

2

Disclaimer

https://peter.bourgon.org/go-for-industrial-programming/
3

Trade-offs aka Pros & Cons

4

Context

Goal:

5

Microservices pros

6

Dist-sys cons

"A microservices-based application is a distributed system running on multiple processes or services, usually even across multiple servers or hosts. Each service instance is typically a process. Therefore, services must interact using an inter-process communication protocol such as HTTP, AMQP, or a binary protocol like TCP, depending on the nature of each service."

docs.microsoft.com

That means:

7

Microservices cons

Problems:

Solutions:

8

Pattern: servers/consumers idempotency

"[...] is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application" --Wikipedia

Our solution:

9

10

Pattern: decompose by subdomain

11

Decomposing the subdomain with DDD

12

Decomposing the subdomain with DDD

13

Package Design with Hexagonal Architecture

https://herbertograca.com/2017/11/16/explicit-architecture-01-ddd-hexagonal-onion-clean-cqrs-how-i-put-it-all-together/
14

Package Design (cont.)

Start with few packages with TDD, expand when it feels natural.

Other considerations:

15

Example: our service directory tree

$ tree -d
.
├── app
├── cmd
│   └── delivery-system
├── config
├── doc
│   └── http-fixtures
├── domain
├── infra
│   ├── amqp
│   │   └── events
│   │       ├── v1
│   │       └── v2
│   ├── http
│   │   ├── graphql
│   │   └── jsonschema
│   ├── inmemory
│   └── sql
│       └── migrations
├── o11y
│   ├── iox
│   └── stackdriver
└── types
16

Example: our package dependency graph

$ go-dependency-graph -level=1 ./domain
./domain
 ├ github.com/PaackEng/delivery-system/types
 ├ github.com/google/uuid

$ go-dependency-graph -level=1 ./app
./app
 ├ github.com/PaackEng/delivery-system/domain
 ├ github.com/PaackEng/delivery-system/o11y
 ├ github.com/PaackEng/delivery-system/types

$ go-dependency-graph -level=1 ./infra
./infra
 ├ github.com/PaackEng/delivery-system/app
 ├ github.com/PaackEng/delivery-system/domain
 ├ github.com/PaackEng/delivery-system/infra/amqp/events/v1
 ├ github.com/PaackEng/delivery-system/infra/amqp/events/v2
 ├ github.com/PaackEng/delivery-system/types
 ├ github.com/gogo/protobuf/proto
 ├ github.com/golang/protobuf/ptypes
 ├ github.com/google/uuid
17

Some notes about how we do TDD

"The importance of tests are their utility" -- Marcos Quesada

TDD works great for:

Almost all patterns and abstractions (interfaces mostly) that we have implemented have been driven by tests and having studied its trade-offs, not because we thought they were cool.

18

A TDD workflow

19

Domain package

// An entity is the basic unit of state and behaviour of our domain.
// In a DDD context it as an ID and records events.
// This type is intended to be embedded in the domain entities.
type entity struct {
    id        string
    events    []types.Event
    createdAt time.Time
}

func (e *entity) Record(ev types.Event) {
    e.events = append(e.events, ev)
}
20

Events

// Event describe a change that happened.
//
// * Past tense e.g. RouteCreated.
// * Contains intent e.g. EmailChanged is better than EmailSet.
type Event interface {
    ID() string
    Name() string
    At() time.Time
    UserID() string
}

var _ Event = &BasicEvent{}

// BasicEvent is the minimal domain event struct.
// This type is intended to be embedded in the domain events.
type BasicEvent struct {
    IDAttr     string    `json:"id"`
    NameAttr   string    `json:"name"`
    AtAttr     time.Time `json:"at"`
    UserIDAttr string    `json:"user_id"`
}

// here are the methods that implement the interface
21

Recording events example

// AssignRound stores the ID of the round.
func (m *Movement) AssignRound(round Round) {
    roundID := round.ID()
    if types.StringValue(m.roundID) == roundID {
        // idempotent assign
        return
    }
    m.roundID = &roundID
    m.Record(MovementAssignedToRound{
        BasicEvent: types.NewBasicEvent(uuid.New().String(), EventNameMovementAssignedToRoundV1),
        MovementID: m.ID(),
        RoundID:    roundID,
    })
}

How they will be retrieved and sent to a message broker is later described.

22

Application package contents

$ ls app/*.go | grep -v -E 'test|mock'
app.go
command_handlers.go
event_handlers.go
events_bus.go
events_worker.go
query_handlers.go
repositories.go
transaction.go
23

Application interfaces

"Go interfaces generally belong in the package that uses values of the interface type, not the package that implements those values"

https://www.efekarakus.com/golang/2019/12/29/working-with-interfaces-in-go.html

"I use interfaces when I have more than one implementation, and I always have 2 implementations for testing purposes: the real one and the mock" -- Peter Bourgon

Example from app/repositories.go:

type RoundsRepository interface {
    // Insert must be idempotent (aka: insert if not exist)
    Insert(tx Tx, round domain.Round) error
    Update(tx Tx, round domain.Round) error
    FindAll(ctx context.Context) ([]domain.Round, error)
    FindByID(ctx context.Context, ID string) (domain.Round, error)
}
24

The Command Handler interface

Emerged from the need of realiably persisting the entity changes and the events generated from those changes avoiding distributed transactions/2PC (see Transactional Outbox pattern).

// CommandHandler is the interface for running transactional logic based on a command input.
type CommandHandler interface {
    Handle(ctx context.Context, tx Tx, cmd Command) ([]types.Event, error)
}

// Command is the interface to identify the DTO for a given command by name.
type Command interface {
    Name() string
}

// Tx is the interface to execute database transaction operations.
type Tx interface {
    Commit() error
    Rollback() error
}
25

Command handler example

func (cd CreateDriver) Handle(ctx context.Context, tx Tx, cmd Command) ([]types.Event, error) {
    c, ok := cmd.(CreateDriverCmd)
    if !ok {
        return nil, ErrInvalidCommand{CreateDriverCmdName, cmd.Name()}
    }

    driver, err := domain.NewDriver(c.DriverID, c.DriverName, c.Vehicle)
    if err != nil {
        return nil, NewErrOp(errMsgCreateDriver, err)
    }

    if err := cd.driversRepository.Insert(tx, driver); err != nil {
        if !errors.Is(err, ErrDuplicatedID) {
            return nil, NewErrOp(errMsgCreateDriver, err)
        }
        // idempotency case for duplicated id
        driver.ClearEvents()
    }

    return driver.Events(), nil
}

Note: events persistence and transactionality are handled in middlewares.

26

The Query Handler interface

Emerged from the need of adding observability to the read operations, as a query handle middleware. Also allows to add other middlewares with easy, for e.g add a middleware for caching queries with TTL.

// Query is the interface to identify the DTO for a given query by name.
type Query interface {
    Name() string
}

// QueryResult is a generic query result type.
type QueryResult interface{}

// QueryHandler is the interface for handling queries.
type QueryHandler interface {
    Handle(ctx context.Context, query Query) (QueryResult, error)
}

// QueryHandlerMiddleware is a type for decorating QueryHandlers
type QueryHandlerMiddleware func(h QueryHandler) QueryHandler
27

Application interfaces for handling events

// A BrokerMessage has a name and data in its body.
type BrokerMessage interface {
    Name() string
    Body() []byte
}

// A ConsumedBrokerMessage is a BrokerMessage that's consumed comes from the Broker and can acknowledge.
type ConsumedBrokerMessage interface {
    BrokerMessage
    Acknowledge() error
    Discard() error
    Retry() error
}

// A MessageBroker is the interface implemented by types that can produce and consume messages.
type MessageBroker interface {
    Produce(msg BrokerMessage) error
    Consume() <-chan ConsumedBrokerMessage
}

// EventsMarshaler is the interface implemented by types that can marshal and unmarshal events.
type EventsMarshaler interface {
    Marshal(event types.Event) ([]byte, error)
    Unmarshal(data []byte, event types.Event) error
}
28

Errors are APIs

The APIs are not just the interfaces but also the errors returned by their methods.

// Errors are part of the app layer API. They also can be used in the infra layer.
var (
    ErrDriverNotFound   = errors.New("driver not found")
    ErrMovementNotFound = errors.New("movement not found")
    ErrPackageNotFound  = errors.New("package not found")
    ErrRoundNotFound    = errors.New("round not found")
    ErrDuplicatedID     = errors.New("duplicated id")
)

// ErrRepository is the type to wrap all the repository failures. It must be
// used by all its implementations. The code is used to avoid sensitive data to
// be read by the user.
type ErrRepository struct {
    error
    Code string
}

// Unwrap returns the underlying error
func (e ErrRepository) Unwrap() error {
    return errors.Unwrap(e.error)
}
29

Infrastructure

Mainly implements the application interfaces and define the ports (e.g http.Server).

├── infra
│   ├── amqp
│   │   └── events
│   │       ├── v1
│   │       └── v2
│   ├── http
│   │   ├── graphql
│   │   └── jsonschema
│   ├── inmemory
│   └── sql
│       └── migrations
30

About the Event Driven Architecture

31

RabbitMQ topology

Paack's RabbitMQ topology based on CodelyTV's approach (by Joan Maeso)
32

Infrastructure: AMQP

A wrapper of github.com/streadway/amqp

// Produce sends the message to the produceExchange via channel.
func (b MessageBroker) Produce(ctx context.Context, msg app.BrokerMessage) error {
    exchange := b.conf.ProduceExchange
    routingKey := msg.Name()
    // TODO: delivery mode, priority opts?
    publishOpts := map[string]interface{}{
        "messageId": msg.Name(),
    }

    if err := b.produceChan.Publish(exchange, routingKey, msg.Body(), publishOpts); err != nil {
        return err
    }
    _ = o11y.FromContext(ctx).Count(ctx, PublishCounterName, 1, o11y.NewTag("exchange", exchange))

    return nil
}


// Consume returns a message from the delivery channel.
func (b MessageBroker) Consume() <-chan app.ConsumedBrokerMessage {
    return b.consumedMessagesChan
}
33

Infrastructure: sql

A wrapper of upper.io/db.v3/postgresql

func (r DriversRepository) Insert(tx app.Tx, domainDriver domain.Driver) error {
    upperioTx, ok := tx.(sqlbuilder.Tx)
    if !ok {
        return app.NewDriversRepoError(errMsgWrongTxType, nil)
    }

    var d driver
    driversCollection := upperioTx.Collection(driversName)
    err := driversCollection.Find(db.Cond{"id": domainDriver.ID()}).One(&d)
    if err == nil {
        return app.NewDriversRepoError(errMsgInsert, app.ErrDuplicatedID)
    }
    if !errors.Is(err, db.ErrNoMoreRows) {
        return app.NewDriversRepoError(errMsgInsert, err)
    }

    driver := newDriver(domainDriver)
    if _, err := driversCollection.Insert(driver); err != nil {
        return app.NewDriversRepoError(errMsgInsert, err)
    }
    return nil
}
34

o[bservabilit]y aka o11y

35

o11y: Logs

There are more than 50 libs in awesome-go repo.

Dave Cheney: logging libs are too complex

A simple implementation:

type Tag struct {
    Name  string
    Value string
}

type Logger interface {
    Debug(message string, tags ...Tag)
    Info(message string, tags ...Tag)
}

Tons of services: ElasticSearch, Graylog, Rollbar, AWS CloudWatch Logs, GCP StackDriver Logs (integrated with GKE)...

36

o11y: Metrics

// TODO: Gauges, Histograms
type Metrics interface {
       Count(ctx context.Context, name string, value int64, tags ...Tag) error
}

Implementations: StatsD, Prometheus, AWS CloudWatch Custom Metrics, GCP StackDriver Custom Metrics, NewRelic Custom Metrics...

Go libs:

37

o11y: Observer

Simple interface aggregation that will be passed all around our code via context.

// TODO: tracing
type Observer interface {
    Logger
    Metrics
}

The Red Method:

Examples:

38

More on errors

Error wrapping has been introduced in go >= 1.13:

Wrapped error err.Error() result:

graphql: transaction: events store: op create a round: movement IDs can't be empty

Application example:

// infra/http/graphql package
func (r *queryResolver) Round(ctx context.Context, roundID string) (*Round, error) {
    result, err := r.GetRound.Handle(ctx, app.RoundQuery{RoundID: roundID})
    if err != nil {
        if errors.Is(err, app.ErrRoundNotFound) {
            return nil, nil
        }
        return nil, newErrGraphQL(err)
    }
    // rest
}

example: graphQL error typing
elm is good at types too

39

Schemas and codegen

Our uses:

Custom shell scripts and Makefile rules to generate the code and place it in the correct package.

40

Simple main func

Create context and observer, log start/stop and call run().

func main() {
    ctx := context.Background()

    // o11y - do the first thing because want to inject it in the context
    observer := observerFromEnv(ctx, config.Env)
    observer.Info("system start")
    defer observer.Info("system stop")

    readyCh := make(chan struct{})
    go func() {
        <-readyCh
        observer.Info("system ready to serve")
    }()

    ctx = o11y.WithObserver(ctx, observer)
    err = run(ctx, config, readyCh)
    if err != nil {
        observer.Info("system error: " + err.Error())
    }
}
41

Run func & dependency injection

DI types: compile time (e.g google's wire) vs runtime.

Our run function builds the dependency graph on startup time and it's end-to-end tested via HTTP port.

42

Context for dependency injection

type ctxKey struct{}

// WithObserver returns the provided context with an observer value.
func WithObserver(ctx context.Context, o Observer) context.Context {
    return context.WithValue(ctx, ctxKey{}, o)
}

// FromContext returns an Observer from the provided context.
func FromContext(ctx context.Context) Observer {
    observer, _ := ctx.Value(ctxKey{}).(Observer)
    return observer
}
43

Tip: context for http.Handler requests

Use http.Server.BaseContex() as base for http.Handler req.Context():

func NewServer(baseCtx context.Context, handler http.Handler) *http.Server {
    server := &http.Server{
        Handler: handler,
    }

    // serve requests with our own context
    server.BaseContext = func(ln net.Listener) context.Context {
        return baseCtx
    }

    return server
}
44

Some tooling used

Documentation:

45

Inspiration

46

Thank you

Gonzalo Serrano Revuelta

Lead SWE at Paack

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)