Building (μ)services in Go
applying TDD, the Hexagonal Architexture, DDD and CQS
29 January 2020
Gonzalo Serrano Revuelta
Lead SWE at Paack
Gonzalo Serrano Revuelta
Lead SWE at Paack
Goal:
"A microservices-based application is a distributed system running on multiple processes or services, usually even across multiple servers or hosts. Each service instance is typically a process. Therefore, services must interact using an inter-process communication protocol such as HTTP, AMQP, or a binary protocol like TCP, depending on the nature of each service."
That means:
Problems:
Solutions:
"[...] is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application" --Wikipedia
Our solution:
Start with few packages with TDD, expand when it feels natural.
application and domain, with unit tests as entry pointinfrastucture, with integration tests (with dockerized infra + build tags)main in cmd/your-service-name, with e2e testOther considerations:
types (cross-package types), config (because it's an aggregation of configs from other packages), etcinternal yet, maybe we do in the future.$ tree -d . ├── app ├── cmd │ └── delivery-system ├── config ├── doc │ └── http-fixtures ├── domain ├── infra │ ├── amqp │ │ └── events │ │ ├── v1 │ │ └── v2 │ ├── http │ │ ├── graphql │ │ └── jsonschema │ ├── inmemory │ └── sql │ └── migrations ├── o11y │ ├── iox │ └── stackdriver └── types
$ go-dependency-graph -level=1 ./domain ./domain ├ github.com/PaackEng/delivery-system/types ├ github.com/google/uuid $ go-dependency-graph -level=1 ./app ./app ├ github.com/PaackEng/delivery-system/domain ├ github.com/PaackEng/delivery-system/o11y ├ github.com/PaackEng/delivery-system/types $ go-dependency-graph -level=1 ./infra ./infra ├ github.com/PaackEng/delivery-system/app ├ github.com/PaackEng/delivery-system/domain ├ github.com/PaackEng/delivery-system/infra/amqp/events/v1 ├ github.com/PaackEng/delivery-system/infra/amqp/events/v2 ├ github.com/PaackEng/delivery-system/types ├ github.com/gogo/protobuf/proto ├ github.com/golang/protobuf/ptypes ├ github.com/google/uuid
"The importance of tests are their utility" -- Marcos Quesada
TDD works great for:
Almost all patterns and abstractions (interfaces mostly) that we have implemented have been driven by tests and having studied its trade-offs, not because we thought they were cool.
18// An entity is the basic unit of state and behaviour of our domain. // In a DDD context it as an ID and records events. // This type is intended to be embedded in the domain entities. type entity struct { id string events []types.Event createdAt time.Time } func (e *entity) Record(ev types.Event) { e.events = append(e.events, ev) }
// Event describe a change that happened. // // * Past tense e.g. RouteCreated. // * Contains intent e.g. EmailChanged is better than EmailSet. type Event interface { ID() string Name() string At() time.Time UserID() string } var _ Event = &BasicEvent{} // BasicEvent is the minimal domain event struct. // This type is intended to be embedded in the domain events. type BasicEvent struct { IDAttr string `json:"id"` NameAttr string `json:"name"` AtAttr time.Time `json:"at"` UserIDAttr string `json:"user_id"` } // here are the methods that implement the interface
// AssignRound stores the ID of the round. func (m *Movement) AssignRound(round Round) { roundID := round.ID() if types.StringValue(m.roundID) == roundID { // idempotent assign return } m.roundID = &roundID m.Record(MovementAssignedToRound{ BasicEvent: types.NewBasicEvent(uuid.New().String(), EventNameMovementAssignedToRoundV1), MovementID: m.ID(), RoundID: roundID, }) }
How they will be retrieved and sent to a message broker is later described.
22$ ls app/*.go | grep -v -E 'test|mock' app.go command_handlers.go event_handlers.go events_bus.go events_worker.go query_handlers.go repositories.go transaction.go
"Go interfaces generally belong in the package that uses values of the interface type, not the package that implements those values"
"I use interfaces when I have more than one implementation, and I always have 2 implementations for testing purposes: the real one and the mock" -- Peter Bourgon
Example from app/repositories.go:
type RoundsRepository interface { // Insert must be idempotent (aka: insert if not exist) Insert(tx Tx, round domain.Round) error Update(tx Tx, round domain.Round) error FindAll(ctx context.Context) ([]domain.Round, error) FindByID(ctx context.Context, ID string) (domain.Round, error) }
Emerged from the need of gathering the events from the entities and persisting the entity changes and the events in the same transactions while avoiding distributed transactions/2PC (see Transactional Outbox pattern).
// CommandHandler is the interface for handling commands. type CommandHandler interface { Handle(ctx context.Context, tx Tx, cmd Command) ([]types.Event, error) } // Command is the interface to identify the DTO for a given command by name. type Command interface { Name() string } // Tx is the interface to execute database transaction operations. type Tx interface { Commit() error Rollback() error }
func (cd CreateDriver) Handle(ctx context.Context, tx Tx, cmd Command) ([]types.Event, error) { c, ok := cmd.(CreateDriverCmd) if !ok { return nil, ErrInvalidCommand{CreateDriverCmdName, cmd.Name()} } driver, err := domain.NewDriver(c.DriverID, c.DriverName, c.Vehicle) if err != nil { return nil, NewErrOp(errMsgCreateDriver, err) } if err := cd.driversRepository.Insert(tx, driver); err != nil { if !errors.Is(err, ErrDuplicatedID) { return nil, NewErrOp(errMsgCreateDriver, err) } // idempotency case for duplicated id driver.ClearEvents() } return driver.Events(), nil }
Note: events persistence and transactionality are handled in middlewares.
26Emerged from the need of adding observability to the read operations, as a query handle middleware. Probably will add a cache middleware soon.
// Query is the interface to identify the DTO for a given query by name. type Query interface { Name() string } // QueryResult is a generic query result type. type QueryResult interface{} // QueryHandler is the interface for handling queries. type QueryHandler interface { Handle(ctx context.Context, query Query) (QueryResult, error) } // QueryHandlerMiddleware is a type for decorating QueryHandlers type QueryHandlerMiddleware func(h QueryHandler) QueryHandler
// A BrokerMessage has a name and data in its body. type BrokerMessage interface { Name() string Body() []byte } // A ConsumedBrokerMessage is a BrokerMessage that's consumed comes from the Broker and can acknowledge. type ConsumedBrokerMessage interface { BrokerMessage Acknowledge() error Discard() error Retry() error } // A MessageBroker is the interface implemented by types that can produce and consume messages. type MessageBroker interface { Produce(msg BrokerMessage) error Consume() <-chan ConsumedBrokerMessage } // EventsMarshaler is the interface implemented by types that can marshal and unmarshal events. type EventsMarshaler interface { Marshal(event types.Event) ([]byte, error) Unmarshal(data []byte, event types.Event) error }
The APIs are not just the interfaces but also the errors returned by their methods.
// Errors are part of the app layer API. They also can be used in the infra layer. var ( ErrDriverNotFound = errors.New("driver not found") ErrMovementNotFound = errors.New("movement not found") ErrPackageNotFound = errors.New("package not found") ErrRoundNotFound = errors.New("round not found") ErrDuplicatedID = errors.New("duplicated id") ) // ErrRepository is the type to wrap all the repository failures. It must be // used by all its implementations. The code is used to avoid sensitive data to // be read by the user. type ErrRepository struct { error Code string } // Unwrap returns the underlying error func (e ErrRepository) Unwrap() error { return errors.Unwrap(e.error) }
Mainly implements the application interfaces and define the ports (e.g http.Server).
├── infra │ ├── amqp │ │ └── events │ │ ├── v1 │ │ └── v2 │ ├── http │ │ ├── graphql │ │ └── jsonschema │ ├── inmemory │ └── sql │ └── migrations
amqp: RabbitMQ brokerhttp: handlers, muxhttp/grapql: queries and mutation resolvershttp/jsonschema: DTOs for REST APIinmemory: repositories, brokersql: PostgreSQL repositories
A wrapper of github.com/streadway/amqp
// Produce sends the message to the produceExchange via channel. func (b MessageBroker) Produce(ctx context.Context, msg app.BrokerMessage) error { exchange := b.conf.ProduceExchange routingKey := msg.Name() // TODO: delivery mode, priority opts? publishOpts := map[string]interface{}{ "messageId": msg.Name(), } if err := b.produceChan.Publish(exchange, routingKey, msg.Body(), publishOpts); err != nil { return err } _ = o11y.FromContext(ctx).Count(ctx, PublishCounterName, 1, o11y.NewTag("exchange", exchange)) return nil } // Consume returns a message from the delivery channel. func (b MessageBroker) Consume() <-chan app.ConsumedBrokerMessage { return b.consumedMessagesChan }
A wrapper of upper.io/db.v3/postgresql
func (r DriversRepository) Insert(tx app.Tx, domainDriver domain.Driver) error { upperioTx, ok := tx.(sqlbuilder.Tx) if !ok { return app.NewDriversRepoError(errMsgWrongTxType, nil) } var d driver driversCollection := upperioTx.Collection(driversName) err := driversCollection.Find(db.Cond{"id": domainDriver.ID()}).One(&d) if err == nil { return app.NewDriversRepoError(errMsgInsert, app.ErrDuplicatedID) } if !errors.Is(err, db.ErrNoMoreRows) { return app.NewDriversRepoError(errMsgInsert, err) } driver := newDriver(domainDriver) if _, err := driversCollection.Insert(driver); err != nil { return app.NewDriversRepoError(errMsgInsert, err) } return nil }
peter.bourgon.org/blog/2017/02/21/metrics-tracing-and-logging.html
35There are more than 50 libs in awesome-go repo.
Dave Cheney: logging libs are too complex
A simple implementation:
type Tag struct {
Name string
Value string
}
type Logger interface {
Debug(message string, tags ...Tag)
Info(message string, tags ...Tag)
}Tons of services: ElasticSearch, Graylog, Rollbar, AWS CloudWatch Logs, GCP StackDriver Logs (integrated with GKE)...
36// TODO: Gauges, Histograms
type Metrics interface {
Count(ctx context.Context, name string, value int64, tags ...Tag) error
}Implementations: StatsD, Prometheus, AWS CloudWatch Custom Metrics, GCP StackDriver Custom Metrics, NewRelic Custom Metrics...
Go libs:
Simple interface aggregation that will be passed all around our code via context.
// TODO: tracing
type Observer interface {
Logger
Metrics
}Examples:
http.Handler (middleware)app.CommandHandler and app.QueryHandler (middlewares)Error wrapping has been introduced in go >= 1.13:
func Is(err, target error) boolfunc As(err error, target interface{}) bool
Wrapped error err.Error() result:
graphql: transaction: events store: op create a round: movement IDs can't be empty
Application example:
// infra/http/graphql package
func (r *queryResolver) Round(ctx context.Context, roundID string) (*Round, error) {
result, err := r.GetRound.Handle(ctx, app.RoundQuery{RoundID: roundID})
if err != nil {
if errors.Is(err, app.ErrRoundNotFound) {
return nil, nil
}
return nil, newErrGraphQL(err)
}
// rest
}
example: graphQL error typing
elm is good at types too
Our uses:
Custom shell scripts and Makefile rules to generate the code and place it in the correct package.
40
Create context and observer, log start/stop and call run().
func main() { ctx := context.Background() // o11y - do the first thing because want to inject it in the context observer := observerFromEnv(ctx, config.Env) observer.Info("system start") defer observer.Info("system stop") readyCh := make(chan struct{}) go func() { <-readyCh observer.Info("system ready to serve") }() ctx = o11y.WithObserver(ctx, observer) err = run(ctx, config, readyCh) if err != nil { observer.Info("system error: " + err.Error()) } }
DI types: compile time (e.g google's wire) vs runtime.
Our run function builds the dependency graph on startup time and it's end-to-end tested via HTTP port.
signal.Notify()type ctxKey struct{}
// WithObserver returns the provided context with an observer value.
func WithObserver(ctx context.Context, o Observer) context.Context {
return context.WithValue(ctx, ctxKey{}, o)
}
// FromContext returns an Observer from the provided context.
func FromContext(ctx context.Context) Observer {
observer, _ := ctx.Value(ctxKey{}).(Observer)
return observer
}
Use http.Server.BaseContex() as base for http.Handler req.Context():
func NewServer(baseCtx context.Context, handler http.Handler) *http.Server {
server := &http.Server{
Handler: handler,
}
// serve requests with our own context
server.BaseContext = func(ln net.Listener) context.Context {
return baseCtx
}
return server
}coreutils and gnu-sed brew packages for macgofmt for avoiding silly PR discussionsDocumentation: