Skip to main content
  1. internet/

事件驱动系列—watermill

·8332 words·17 mins·

背景 #

使用HTTP构建应用时无需关注HTTP的底层协议,同样,使用事件驱动时同样也应无需关注事件的底层协议——watermill为我们封装好了这些功能。

Base Usage #

Event #

事件由两方面构成:数据结构和命名。

数据结构由发送方定义,但往往也要综合考虑订阅方的需求,比如员工离职事件可以只包含企业ID和员工ID,但员工更新事件可能就要包含更新字段的新旧两种数据,就像MySQL的binlog(Row格式)一样。

数据结构也不能任意改动,实际上数据结构是订阅双方的一种协议。如果一定要改数据结构,需要了解都有哪些订阅方,会造成哪些影响。

命名 #

在事件驱动的系统中会存在大量的事件,事件成了业务逻辑的“枢纽”,因此事件的命名尤为重要。

事件要命名为过去式,表示已发生的事件。

场景举例:当用户注册后,需要发送邮件。让我们看下这个事件应如何命名:UserSignedUp 还是SendWelcomeEmailIsReadyToSend?

前者代表了用户已经注册,后者表示将要发送邮件。看起来差别不大?

现在有个新需求,要在用户注册后将用户加入消息推送列表。

  • 对于UserSignedUp,只需要添加一个新的订阅者即可。
  • 对于SendWelcomeEmailIsReadyToSend,则需要在原有的代码基础上再发送一个JoinMessageSubscribeList事件。

这就是为什么要将事件命名为过去式!

Publisher #

事件需要发送者和订阅者,watermill自然也对这些进行了封装。

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

Publisher的API相当简洁(这也是watermill的一个设计哲学)

Constructor #

watermill中已经集成了多种Publisher,如kafka、rabbitmq、nats等等。我们以使用redis stream为例创建一个publisher:

func NewRedisPublisher(rdb *redis.Client, watermillLogger watermill.LoggerAdapter) message.Publisher {
	var pub message.Publisher
	pub, err := redisstream.NewPublisher(redisstream.PublisherConfig{Client: rdb}, watermillLogger)
	if err != nil {
		panic(err)
	}
	pub = observability.TracingPublisherDecorator{pub}
	return pub
}

Decortor #

可以使用装饰器来包装publisher,如在消息中加入链路信息:

type TracingPublisherDecorator struct {
	message.Publisher
}

func (p TracingPublisherDecorator) Publish(topic string, messages ...*message.Message) error {
	for i := range messages {
		otel.GetTextMapPropagator().Inject(messages[i].Context(), propagation.MapCarrier(messages[i].Metadata))
	}
	return p.Publisher.Publish(topic, messages...)
}

我们构造了一个TracingPublisherDecorator,使用时直接“套”在publisher上就可以了:

pub, err := redisstream.NewPublisher(redisstream.PublisherConfig{Client: rdb}, watermillLogger)
if err != nil {
  panic(err)
}
pub = observability.TracingPublisherDecorator{pub}

Config #

每个发送者的实现都依赖于“消息队列”,而每个消息队列支持的功能不同,配置也不同。基于redis的stream实现的发送者的配置如下:

type PublisherConfig struct {
	Client     redis.UniversalClient
	Marshaller Marshaller
	Maxlens    map[string]int64
}

Subscriber #

订阅者的封装逻辑基本上与Publisher一致,不再赘述。

Router #

router的用法和HTTP框架的router很像。

基本用法 #

  logger := watermill.NewStdLogger(false, false)

  router, err := message.NewRouter(message.RouterConfig{}, logger)
  if err != nil {
    panic(err)
  }
  rdb := redis.NewClient(&redis.Options{
    Addr: os.Getenv("REDIS_ADDR"),
  })
  sub, err := redisstream.NewSubscriber(redisstream.SubscriberConfig{
    Client: rdb,
  }, logger)
  if err != nil {
    panic(err)
  }

  pub, err := redisstream.NewPublisher(redisstream.PublisherConfig{
    Client: rdb,
  }, logger)
  if err != nil {
    panic(err)
  }

  router.AddHandler(
    "handler for test",
    "subscriber_topic",
    sub,
    "publisher_topic",
    pub,
    func(msg *message.Message) ([]*message.Message, error) {
      // 处理逻辑
      return []*message.Message{msg}, nil
    },
  )
	err = router.Run(context.Background())
	if err != nil {
		panic(err)
	}

上面的代码很像我们平时使用的HTTP框架。其中主要的元素为:

  • sub: 通过redis的stream构造的订阅者
  • pub: 通过redis的stream构造的发布者
  • subscriber_topic: 这个路由订阅的主题
  • publisher_topic: 这个路由发布的主题

AddNoPublisherHandler #

如果只是处理事件而无需再次发布事件,可以使用NoPublisherHandler.

router.AddNoPublisherHandler(
		"handler for test",
		"subscriber_topic",
		sub,
		func(msg *message.Message) error {
			// 处理逻辑
			return nil
		},
	)

可以对同一个topic添加了多个Handler,这样同一个topic的数据就可以被多个Handler处理。

实现了类似消费者组的功能,但不同于消费者组的消费逻辑,这里实现的是路由逻辑。比如说如果需要将事件保存到数据湖,则应该在路由层添加Handler。

Middleware #

中间件可以用来处理鉴权、增加日志、添加链路信息等等。

Log #

  router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			logger.Info("log", watermill.LogFields{"payload":string(msg.Payload)})
			return h(msg)
		}
	})

Recover #

router.AddMiddleware(middleware.Recoverer)

Retry #

router.AddMiddleware(middleware.Retry{
		MaxRetries:      10,
		InitialInterval: time.Millisecond * 100,
		MaxInterval:     time.Second,
		Multiplier:      2,
		Logger:          watermillLogger,
	}.Middleware)

Prometheus #

var (
	messageProcessTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "messages",
			Name:      "processed_total",
			Help:      "The total processed messages",
		},
		[]string{"topic", "handler"},
	)
	messageProcessFailedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "messages",
			Name:      "processed_failed_total",
			Help:      "The total processed messages",
		},
		[]string{"topic", "handler"},
	)
	messagesProcessingDuration = promauto.NewSummaryVec(
		prometheus.SummaryOpts{
			Namespace:  "messages",
			Name:       "processing_duration_seconds",
			Help:       "The total time spent processing messages",
			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
		},
		[]string{"topic", "handler"},
	)
)

router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			topic := message.SubscribeTopicFromCtx(msg.Context())
			handler := message.HandlerNameFromCtx(msg.Context())
			labels := prometheus.Labels{"topic": topic, "handler": handler}
			messageProcessTotal.With(labels).Inc()
			now := time.Now()
			msgs, err := h(msg)
			if err != nil {
				messageProcessFailedTotal.With(labels).Inc()
			}

			messagesProcessingDuration.With(labels).Observe(time.Now().Sub(now).Seconds())
			return msgs, err
		}
	})

Tracing #

router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) (events []*message.Message, err error) {
			topic := message.SubscribeTopicFromCtx(msg.Context())
			handler := message.HandlerNameFromCtx(msg.Context())

			ctx := otel.GetTextMapPropagator().Extract(msg.Context(), propagation.MapCarrier(msg.Metadata))
			ctx, span := otel.Tracer("").Start(
				ctx,
				fmt.Sprintf("topic: %s, handler: %s", topic, handler),
				trace.WithAttributes(
					attribute.String("topic", topic),
					attribute.String("handler", handler),
				),
			)
			defer span.End()
			msg.SetContext(ctx)

			msgs, err := h(msg)
			if err != nil {
				span.RecordError(err)
				span.SetStatus(codes.Error, err.Error())
			}
			return msgs, err
		}
	})

Decorators #

可以分别对订阅者和发布者构造装饰器,如保证每个发布者都携带correlation_id标识:

const correlationIDMessageMetadataKey = "correlation_id"
const correlationIDKey = "correlation_key"

type CorrelationPublisherDecorator struct {
	message.Publisher
}

func (c CorrelationPublisherDecorator) Publish(topic string, messages ...*message.Message) error {
	for i := range messages {
		// if correlation_id is already set, let's not override
		if messages[i].Metadata.Get(correlationIDMessageMetadataKey) != "" {
			continue
		}

		// correlation_id as const
		messages[i].Metadata.Set(correlationIDMessageMetadataKey, CorrelationIDFromContext(messages[i].Context()))
	}

	return c.Publisher.Publish(topic, messages...)
}

func CorrelationIDFromContext(ctx context.Context) string {
	v, ok := ctx.Value(correlationIDKey).(string)
	if ok {
		return v
	}

	// add "gen_" prefix to distinguish generated correlation IDs from correlation IDs passed by the client
	// it's useful to detect if correlation ID was not passed properly
	return "gen_" + shortuuid.New()
}

func main() {
  // ....
  router.AddPublisherDecorators(func(pub message.Publisher) (message.Publisher, error) {
		return CorrelationPublisherDecorator{pub}, nil
	})
  // ....
}

订阅者同理:

router.AddSubscriberDecorators(func(sub message.Subscriber) (message.Subscriber, error) {
		return CorrelationSubscriberDecorator{pub}, nil
	})

RouterPlugin #

RouterPlugin其实就是服务于Router的中间件,在这里可以添加处理路由的操作,比如监听路由启动完成、监听信号关闭路由。。。

router.AddPlugin(func(router *message.Router) error {
		<-router.Running()
		fmt.Println("started")
		return nil
	})

CQRS #

watermill提供了一整套框架来在事件驱动的架构中实现CQRS(Command-query responsibility segregation),其中主要有三个部分:Event Bus、Event Processor、Event Handler。

操作的时序图如下:

图片来自Three Dots Labs Academy

一个实际的使用场景可能是:

图片来自CQRS Component (watermill.io)

Event Bus #

Event Bus封装了事件的发布,使得发布相关代码更简洁。

Constructor #

func NewBus(publisher message.Publisher, logger watermill.LoggerAdapter) *cqrs.EventBus {
	eventBus, err := cqrs.NewEventBusWithConfig(
		publisher,
		cqrs.EventBusConfig{
			GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
				event, ok := params.Event.(entities.Event)
				if !ok {
					return "", fmt.Errorf("invalid event type: %T doesn't implement entities.Event", params.Event)
				}
				return params.EventName, nil
			},
			Marshaler: cqrs.JSONMarshaler{
				GenerateName: cqrs.StructName,
			},
			Logger: logger,
		})
	if err != nil {
		panic(err)
	}
	return eventBus
}

因为需要发送事件,因此构造Event Bus需要一个publisher。在cqrs.EventBusConfig中可以增强publisher的能力。

type EventBusConfig struct {
	// GeneratePublishTopic is used to generate topic name for publishing event.
	GeneratePublishTopic GenerateEventPublishTopicFn

	// OnPublish is called before sending the event.
	// The *message.Message can be modified.
	//
	// This option is not required.
	OnPublish OnEventSendFn

	// Marshaler is used to marshal and unmarshal events.
	// It is required.
	Marshaler CommandEventMarshaler

	// Logger instance used to log.
	// If not provided, watermill.NopLogger is used.
	Logger watermill.LoggerAdapter
}
  • GeneratePublishTopic: 可以针对Event动态生成topic,通常使用事件名称作为topic,也可以加上当前项目名称作为前缀
  • OnPublish:publish之前的钩子
  • Marshaler:序列化器,用于序列化和反序列化事件。常默认使用json(cqrs.JSONMarshaler)或protobuf(cqrs.ProtobufMarshaler
  • Logger:日志适配器

需要注意的是GeneratePublishTopic中使用的EventName是从MarshalerName方法中获取的。

Publish #

当我们有了Event Bus后,推送事件就变得很简单了, 且EventBus只有一个public API:

func (c EventBus) Publish(ctx context.Context, event any) error {}

Event Processor #

Event Bus负责事件的发送,相对应,Event Processor负责事件的接收。

Constructor #

func NewEventProcessorWithConfig(router *message.Router, config EventProcessorConfig) (*EventProcessor, error) {
	config.setDefaults()

	if err := config.Validate(); err != nil {
		return nil, errors.Wrap(err, "invalid config EventProcessor")
	}
	if router == nil && !config.disableRouterAutoAddHandlers {
		return nil, errors.New("missing router")
	}

	return &EventProcessor{
		router: router,
		config: config,
	}, nil
}

Event Processor用到了之前介绍过的Router来管理事件的接收和分发。我们着重看下EventProcessorConfig:

type EventProcessorConfig struct {
	// GenerateSubscribeTopic is used to generate topic for subscribing to events.
	// If event processor is using handler groups, GenerateSubscribeTopic is used instead.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor is used to create subscriber for EventHandler.
	//
	// This function is called for every EventHandler instance.
	// If you want to re-use one subscriber for multiple handlers, use GroupEventProcessor instead.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle is called before handling event.
	// OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a event.
	//
	// Because of that, you need to explicitly call params.Handler.Handle() to handle the event.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // logic before handle
	//      //  (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // logic after handle
	//      //  (...)
	//
	//      return err
	//  }
	//
	// This option is not required.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent is used to decide if message should be acked if event has no handler defined.
	AckOnUnknownEvent bool

	// Marshaler is used to marshal and unmarshal events.
	// It is required.
	Marshaler CommandEventMarshaler

	// Logger instance used to log.
	// If not provided, watermill.NopLogger is used.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers is used to keep backwards compatibility.
	// it is set when EventProcessor is created by NewEventProcessor.
	// Deprecated: please migrate to NewEventProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}
  • GenerateSubscribeTopic: 用于根据事件动态生成所需订阅的topic,与EventBusConfig的GeneratePublishTopic相对应,两者的函数签名也相似:

    type GenerateEventPublishTopicFn func(GenerateEventPublishTopicParams) (string, error)
    
    type GenerateEventPublishTopicParams struct {
    	EventName string
    	Event     any
    }
    
    type EventProcessorGenerateSubscribeTopicFn func(EventProcessorGenerateSubscribeTopicParams) (string, error)
    
    type EventProcessorGenerateSubscribeTopicParams struct {
    	EventName    string
    	EventHandler EventHandler
    }
    

    EventName的生成规则在Event Bus和Event Processor中要保持一致,而EventName是由Marshal控制的,因此两者需要使用同一个Marshal。

  • SubscriberConstructor:用于构造订阅者。可以根据不同的Event构造不同的订阅者。函数签名为:

    type EventProcessorSubscriberConstructorFn func(EventProcessorSubscriberConstructorParams) (message.Subscriber, error)
    
    type EventProcessorSubscriberConstructorParams struct {
    	HandlerName  string
    	EventHandler EventHandler
    }
    

    在生成的订阅者中,我们可以配置消费者组。如果消费者组依赖了事件或者Handler的属性,那么需要谨慎修改这些属性!

    redisstream.NewSubscriber(
      redisstream.SubscriberConfig{
        Client:        rdb,
        ConsumerGroup: "internal." + params.HandlerName,
      },
    watermillLogger)
    

EventHandler #

既然Event Processor是用来管理事件的接收和处理,那么也就需要管理事件处理的对象——EventHandler。其构造函数如下:

func NewEventHandler[T any](handlerName string, handleFunc func(ctx context.Context, event *T) error) EventHandler {
	return &genericEventHandler[T]{
		handleFunc:  handleFunc,
		handlerName: handlerName,
	}
}

handlerName只是handler的一个标识,不冲突即可。handlFunc是用来处理事件的函数。

EventHandler是一个接口,因此完全可以定制化的实现自己的Handler:

type EventHandler interface {
	// HandlerName is the name used in message.Router while creating handler.
	//
	// It will be also passed to EventsSubscriberConstructor.
	// May be useful, for example, to create a consumer group per each handler.
	//
	// WARNING: If HandlerName was changed and is used for generating consumer groups,
	// it may result with **reconsuming all messages** !!!
	HandlerName() string

	NewEvent() any

	Handle(ctx context.Context, event any) error
}

Event Processor只有一个public API:func (p *EventProcessor) AddHandlers(handlers ...EventHandler) error ,因此无需考虑复杂的管理操作。

Command Bus #

区别于Event,Command:

  • 不使用“过去式”来描述,而是“命令式”
  • 常常只有一个消费者

Command往往不关心执行结果,因此无需返回值。如果是异步处理,则往往使用消息队列;否则,可以使用http或者grpc。

虽然Command和Event的使用场景不同,但使用方式十分类似。

func NewBus(redisPublisher message.Publisher, watermillLogger *log.WatermillLogrusAdapter) *cqrs.CommandBus {
	commandBus, err := cqrs.NewCommandBusWithConfig(
		redisPublisher,
		cqrs.CommandBusConfig{
			GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
				return "commands." + params.CommandName, nil
			},
			Marshaler: cqrs.JSONMarshaler{
				GenerateName: cqrs.StructName,
			},
			Logger: watermillLogger,
		})
	if err != nil {
		panic(err)
	}
	return commandBus
}

// 使用示例
func Send(/*params*/) {
 bus := NewBus(params)
 bus.Send(ctx, Command)
}

Command Processor #

Command Processor用于处理命令接收和命令处理:

type SendNotification struct {
	NotificationID string
	Email          string
	Message        string
}

type Sender interface {
	SendNotification(ctx context.Context, notificationID, email, message string) error
}

func NewProcessor(router *message.Router, sender Sender, sub message.Subscriber, watermillLogger watermill.LoggerAdapter) *cqrs.CommandProcessor {
	eventProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				return "commands." + params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				return sub, nil
			},
			Marshaler: cqrs.JSONMarshaler{
				GenerateName: cqrs.StructName,
			},
			Logger: watermillLogger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlers(cqrs.NewCommandHandler(
		"send_notification",
		func(ctx context.Context, event *SendNotification) error {
			return sender.SendNotification(ctx, event.NotificationID, event.Email, event.Message)
		},
	))
	if err != nil {
		panic(err)
	}

	return eventProcessor
}

RequestReply #

对于需要接收请求的同步命令处理,可以使用requestreply

执行命令:

err := commandProcessor.AddHandlers(
    requestreply.NewCommandHandler(
        "hotel_room_booking",
        ts.RequestReplyBackend,
        func(ctx context.Context, cmd *BookHotelRoom) error {
            return fmt.Errorf("some error")
        },
    ),
)

接受命令返回值:

reply, err := requestreply.SendWithReply[requestreply.NoResult](
    context.Background(),
    ts.CommandBus,
    ts.RequestReplyBackend,
    &BookHotelRoom{ID: "1"},
)

也可以在Handler处理返回值:

err := commandProcessor.AddHandlers(
    requestreply.NewCommandHandlerWithResult[PayForRoom, PayForRoomResult](
        "pay_for_room",
        ts.RequestReplyBackend,
        func(ctx context.Context, cmd *PayForRoom) (PayForRoomResult, error) {
            return PayForRoomResult{PaymentReference: "1234"}, nil
        },
    ),
)
// ...
reply, err := requestreply.SendWithReply[requestreply.NoResult](
    context.Background(),
    ts.CommandBus,
    ts.RequestReplyBackend,
    &TestCommand{ID: "1"},
)

// ...

fmt.Println(reply.Result.PaymentReference) // it's equal to "1234"
fmt.Println(reply.Error) // it's nil

Outbox #

场景 #

当用户下单成功后,我们既要扣除商品数量,又要发送下单成功通知。那么如何保证两个操作的一致性呢?

消息推送往往使用消息队列,而消息队列往往不支持事务,这时候可以使用数据库来做中转——将事件存入数据库中,然后再异步消费。

watermill中把这种功能进行了封装,称其为outbox。outbox能够确保一个消息最少能够发送成功一次。

outbox实际上是一种设计模式,可以参考这篇博客——Outbox, Inbox patterns and delivery guarantees explained - Event-Driven.io

如果你正在担心使用数据库带来的性能问题,可以参考这篇博客——Push-based Outbox Pattern with Postgres Logical Replication - Event-Driven.io

在设计“中转表”之前,阅读这篇文章可以少走弯路——How Postgres sequences issues can impact your messaging guarantees - Event-Driven.io

Publisher #

将事件保存在数据库并消费这个过程可以抽象为发送和订阅。因此outbox中的发送者也实现了Publisher.

func PublishInTx(
	message *message.Message,
	tx *sql.Tx,
	logger watermill.LoggerAdapter,
) error {
	publisher, err := watermillSQL.NewPublisher(
		tx,
		watermillSQL.PublisherConfig{
			SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{},
      AutoInitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		return err
	}
	return publisher.Publish("ItemAddedToCart", message)
}
  • SchemaAdapter: 配置数据库Schema

  • AutoInitializeSchema:是否自动初始化Schema,执行的SQL如下:

    CREATE TABLE IF NOT EXISTS [table name] (
        "offset" SERIAL,
        "uuid" VARCHAR(36) NOT NULL,
        "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
        "payload" JSON DEFAULT NULL,
        "metadata" JSON DEFAULT NULL,
        "transaction_id" xid8 NOT NULL,
        PRIMARY KEY ("transaction_id", "offset")
    );
    

Subscriber #

outbox中的订阅者使用方式如下:

func SubscribeForMessages(db *sqlx.DB, topic string, logger watermill.LoggerAdapter) (<-chan *message.Message, error) {
	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultPostgreSQLSchema{},
			OffsetsAdapter:   sql.DefaultPostgreSQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		return nil, err
	}
	messages, err := subscriber.Subscribe(context.Background(), topic)
	if err != nil {
		panic(err)
	}
	return messages, nil
}

SubscriberConfig提供了更丰富的配置,比如消费者组,重试间隔,backoff等等。

如果设置了自动初始化Schema,则会执行SQL:

CREATE TABLE IF NOT EXISTS [table name] (
    consumer_group VARCHAR(255) NOT NULL,
    offset_acked BIGINT,
    offset_consumed BIGINT NOT NULL,
    PRIMARY KEY(consumer_group)
;

Forwarder #

上述方式需要为每个topic设置两个表,可以使用Forwarder来让topic共用同一张表(实际是两个)。

Publisher #

将消息封装在一个更上层的topic上,比events_to_forward为例:

var outboxTopic = "events_to_forward"

func PublishInTx(
	msg *message.Message,
	tx *sql.Tx,
	logger watermill.LoggerAdapter,
) error {
	publisher, err := watermillSQL.NewPublisher(
		tx,
		watermillSQL.PublisherConfig{
			SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	fp := forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: outboxTopic,
	})
	defer fp.Close()
	return fp.Publish("TopicName", msg)
}

Subscriber #

对应的订阅为:

func RunForwarder(
	db *sqlx.DB,
	rdb *redis.Client,
	outboxTopic string,
	logger watermill.LoggerAdapter,
) error {
	publisher, err := redisstream.NewPublisher(
		redisstream.PublisherConfig{
			Client: rdb,
		},
		logger,
	)
	if err != nil {
		return err
	}
	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultPostgreSQLSchema{},
			OffsetsAdapter:   sql.DefaultPostgreSQLOffsetsAdapter{},
			InitializeSchema: true,
		}, logger,
	)
	if err != nil {
		return err
	}
	fd, err := forwarder.NewForwarder(subscriber, publisher, logger, forwarder.Config{
		ForwarderTopic: outboxTopic,
	})
	if err != nil {
		return err
	}
	go func() {
		err = fd.Run(context.Background())
		if err != nil {
			panic(err)
		}
	}()
	<-fd.Running()
	return nil
}

事件顺序 #

在事件驱动的架构中,事件往往以topic的方式展现,而对topic的消费又往往是并行处理。

有些事件需要严格控制执行顺序,否则会导致数据错乱:比如说员工调岗事件和员工离职事件,如果先处理了员工离职事件,那么处理员工调岗事件就会造成相当大的困惑。

使用消费者组订阅所有事件 #

可以将所有的事件都放到同一个topic(比如events)中,然后使用消费者组——以达到事件fan-out到多个消费者的效果。

然后将相关的事件放入一个Group中进行处理,忽略不相关的事件:

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
        // use ConsumerGroup
				sub, err := kafka.NewSubscriber(kafka.SubscriberConfig{
					Brokers:               []string{kafkaAddr},
					Unmarshaler:           kafka.DefaultMarshaler{},
					OverwriteSaramaConfig: newConfig(),
					ConsumerGroup:         params.EventGroupName,
				}, logger)
				if err != nil {
					return nil, err
				}
				return sub, nil
			},
			AckOnUnknownEvent: true,
			Marshaler:         cqrs.JSONMarshaler{},
			Logger:            logger,
		},
	)
	if err != nil {
		panic(err)
	}

	pub, _ := kafka.NewPublisher(kafka.PublisherConfig{
		Brokers:   []string{kafkaAddr},
		Marshaler: kafka.DefaultMarshaler{},
	}, logger)

	eventProcessor.AddHandlersGroup(
		"employee-related",
		cqrs.NewGroupEventHandler(HandleEmployeePositionTransfered),
		cqrs.NewGroupEventHandler(HandleEmployeeLeft),
	)

// newConfig
func newConfig() *sarama.Config {
	cfg := sarama.NewConfig()
	cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
	return cfg
}

分片 #

上述方案是在消费端进行处理,缺点是消费端需要消费所有的事件。另外一种办法就是按照“实例ID”来进行分片,比如使用员工ID进行分片,那么同一个员工的事件就会顺序消费。以kafka为例:

pub, err := kafka.NewPublisher(kafka.PublisherConfig{
		Brokers: []string{kafkaAddr},
		Marshaler: kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
			return msg.Metadata.Get("employee_id"), nil
		}),
	}, logger)

乐观锁 #

可以使用乐观锁来避免并发问题。我们使用version字段来标记当前记录的版本,每当发送一个事件时,将version放入事件实体中,然后在消费时,判断version的值是否正常。

cqrs.NewEventHandler("OnEmployeeLeft", func(ctx context.Context, event *EmployeeLeft) error {
			employee := getEmployee(event.EmployeeID)

			if event.Version-1 != employee.Version {
				return fmt.Errorf("version not match")
			}

			employee.IsLeft = true
			employee.Version = event.Version

			return nil
})

独自更新 #

有一种无需考虑并发问题的解决办法,那就是从源头解决问题:每个事件都不互相影响。

比如说员工离职事件只是将离职状态进行更改,员工调岗事件只是改变员工岗位而无需查询员工状态。这样两个事件就可以实现“独自更新”互不影响。

Metrics #

Prometheus #

Prometheus是一个用于监控和告警的工具包,由Cloud Native Computing Foundation (CNCF) 开发。它用于从各种来源收集、处理和显示指标,例如Kubernetes、节点和应用程序。

全局指标 #

全局变量往往会由于其众多的引用关系从而导致项目难以维护,但有些例外,如:监控指标、日志。

下面是一些基本指标:处理的消息数量、失败的消息数量、处理的时间分布。

var (
	messageProcessTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "messages",
			Name:      "processed_total",
			Help:      "The total processed messages",
		},
		[]string{"topic", "handler"},
	)
	messageProcessFailedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "messages",
			Name:      "processed_failed_total",
			Help:      "The total processed messages",
		},
		[]string{"topic", "handler"},
	)
	messagesProcessingDuration = promauto.NewSummaryVec(
		prometheus.SummaryOpts{
			Namespace:  "messages",
			Name:       "processing_duration_seconds",
			Help:       "The total time spent processing messages",
			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
		},
		[]string{"topic", "handler"},
	)
)

注入到中间件 #

基本的监控指标在中间件中注入即可,无需影响业务代码。

func useMiddlewares(router *message.Router, watermillLogger watermill.LoggerAdapter) {
	router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			topic := message.SubscribeTopicFromCtx(msg.Context())
			handler := message.HandlerNameFromCtx(msg.Context())
			labels := prometheus.Labels{"topic": topic, "handler": handler}
			messageProcessTotal.With(labels).Inc()
			now := time.Now()
			msgs, err := h(msg)
			if err != nil {
				messageProcessFailedTotal.With(labels).Inc()
			}

			messagesProcessingDuration.With(labels).Observe(time.Now().Sub(now).Seconds())
			return msgs, err
		}
	})
}

开放入口 #

Prometheus基本上都是采用服务端拉取的方式进行采集,因此作为客户端的服务需要提供接口:

// echo
import (
  libHttp "github.com/ThreeDotsLabs/go-event-driven/common/http"
  "github.com/prometheus/client_golang/prometheus/promhttp"
  "github.com/labstack/echo/v4"
)

func NewHttpRouter(){
  e := libHttp.NewEcho()
	e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
}
// iris
import (
	"github.com/kataras/iris/v12"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

func NewHttpRouter(){
	app := iris.New()
	app.Get("/metrics", iris.FromStd(promhttp.Handler()))
}

消息队列exporter #

对于消息队列的Pub/Sub监控,可以在Prometheus官网找到对应的exporter。

Tracing #

链路追踪与指标监控一样,都是微服务时代必不可少的组件。链路追踪能够将追踪一个请求的完整信息,从而能让使用者更了解系统的运作,也能够快速定位错误。

目前常用的链路协议为OpenTelemetry。

OpenTelemetry和Jaeger的区别:

OpenTelemetry和Jaeger都是用于分布式跟踪的工具,但是它们之间有一些区别。OpenTelemetry是一种开放标准,它提供了一组API和SDK,用于在应用程序中生成和传输跟踪数据。OpenTelemetry支持多种编程语言和框架,并且可以与多种后端跟踪系统集成。

Jaeger是一个实现了OpenTracing标准的开源跟踪系统。它提供了一个完整的跟踪解决方案,包括收集、存储和查询跟踪数据的组件。Jaeger支持多种后端存储,包括Cassandra、Elasticsearch和Memory等。

因此,OpenTelemetry是一种标准和API/SDK,而Jaeger是一个完整的跟踪系统。OpenTelemetry可以与Jaeger一起使用,或者与其他跟踪系统集成。

消息系统中的链路传播 #

链路传播需要维护上下文信息,OpenTelemetry推荐的格式为 W3C Trace Context

Publisher #

在发送事件时需要携带链路信息(将上下文中的链路信息提取出来放到消息header中),我们可以通过装饰器模式实现:

type TracingPublisherDecorator struct {
	message.Publisher
}

func (p TracingPublisherDecorator) Publish(topic string, messages ...*message.Message) error {
	for i := range messages {
		otel.GetTextMapPropagator().Inject(messages[i].Context(), propagation.MapCarrier(messages[i].Metadata))
	}
	return p.Publisher.Publish(topic, messages...)
}

使用装饰器:

var pub message.Publisher
pub, _ = redisstream.NewPublisher(...)
pub = PublishDecorator{pub}

Middleware #

在路由层,我们需要从消息的header中提取链路信息,然后保存到上下文中:

func useMiddlewares(router *message.Router, watermillLogger watermill.LoggerAdapter) {
	router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) (events []*message.Message, err error) {
			topic := message.SubscribeTopicFromCtx(msg.Context())
			handler := message.HandlerNameFromCtx(msg.Context())

			ctx := otel.GetTextMapPropagator().Extract(msg.Context(), propagation.MapCarrier(msg.Metadata))
			ctx, span := otel.Tracer("").Start(
				ctx,
				fmt.Sprintf("topic: %s, handler: %s", topic, handler),
				trace.WithAttributes(
					attribute.String("topic", topic),
					attribute.String("handler", handler),
				),
			)
			defer span.End()
			msg.SetContext(ctx)

			msgs, err := h(msg)
			if err != nil {
				span.RecordError(err)
				span.SetStatus(codes.Error, err.Error())
			}
			return msgs, err
		}
	})
}

在这个过程中,我们启动了一个新的Span,并记录了topic和handler(保存在Span的属性中),并且如果发生了错误还会记录错误状态。

Handler #

我们通过Publisher和Middleware成功的在异步消息中传递了链路信息,那么当我们执行sql、调用接口等等操作时如何上报链路呢?

Postgres #

Opentelemetry已经提供了Postgres的插件:

import (
	"github.com/jmoiron/sqlx"
	_ "github.com/lib/pq"
	"github.com/uptrace/opentelemetry-go-extra/otelsql"
	"go.opentelemetry.io/otel/semconv/v1.4.0"
)

func getDB() *sqlx.DB {
	traceDB, err := otelsql.Open(
		"postgres",
		os.Getenv("POSTGRES_URL"),
		otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
		otelsql.WithDBName("db"),
	)
	if err != nil {
		panic(err)
	}
	dbConn := sqlx.NewDb(traceDB, "postgres")
	return dbConn
}
Http #

Opentelemetry也提供了http的插件:

import (
	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  "net/http"
)

func getHttpClient() *http.Client {
	traceHttpClient := &http.Client{
		Transport: otelhttp.NewTransport(
			http.DefaultTransport,
			otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
				return fmt.Sprintf("HTTP %s %s %s", r.Method, r.URL.String(), operation)
			})),
	}
}

TraceProvider #

Opentelemetry是链路追踪的一个SDK,那么链路信息要上报到哪里呢?OpenTelemetry提供了多种Provider,最常用的是Jaeger。

package observability

import (
	"fmt"
	"github.com/ThreeDotsLabs/watermill/message"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/jaeger"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	"go.opentelemetry.io/otel/sdk/trace"
	"go.opentelemetry.io/otel/semconv/v1.21.0"
	"os"
)

func ConfigureTraceProvider() *trace.TracerProvider {
	jaegerEndpoint := os.Getenv("JAEGER_ENDPOINT")
	if jaegerEndpoint == "" {
		jaegerEndpoint = fmt.Sprintf("%s/jaeger-api/api/traces", os.Getenv("GATEWAY_ADDR"))
	}

	exp, err := jaeger.New(
		jaeger.WithCollectorEndpoint(
			jaeger.WithEndpoint(jaegerEndpoint),
		),
	)
	if err != nil {
		panic(err)
	}

	tp := trace.NewTracerProvider(
		// WARNING: `tracesdk.WithSyncer` should be not used in production.
		// For production, you should use `tracesdk.WithBatcher`.
		trace.WithSyncer(exp),
		trace.WithResource(resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceName("tickets"),
		)),
	)

	otel.SetTracerProvider(tp)

	// Don't forget this line! Omitting it will cause the trace to not be propagated via messages.
	otel.SetTextMapPropagator(propagation.TraceContext{})

	return tp
}

Outbox兼容 #

之前我们提到,为了实现一致性,我们先通过事务将一些事件存储到数据库,这时候要如何保存链路信息呢?

其实也很简单,因为我们本来就存储了事件的header,所以只需要在存储到数据库之前将上下文信息存入header即可(利用之前写好的装饰器可以很方便的实现这一功能):

func PublishInTx(
	msg *message.Message,
	tx *sql.Tx,
	logger watermill.LoggerAdapter,
) error {
	var publisher message.Publisher
	var err error

	publisher, err = watermillSQL.NewPublisher(
		tx,
		watermillSQL.PublisherConfig{
			SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{},
		},
		logger,
	)
	if err != nil {
		return fmt.Errorf("failed to create outbox publisher: %w", err)
	}

	publisher = TracingPublisherDecorator{publisher}

	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: outboxTopic,
	})

	publisher = TracingPublisherDecorator{publisher}

	return publisher.Publish("ItemAddedToCart", msg)
}

值得注意的是在PublishInTx中,我们使用了两次装饰器:

  1. 第一次装饰了watermillSQL.Publisher: 将上下文的链路信息提取到header,并保存到数据库
  2. 第二次装饰了forwarder.Publisher:将从数据库中的事件发布到forwarder这个过程的上下文信息中的链路信息提取到header。

第二次装饰器并不是简单的重复提取链路信息,而是携带了forwarder过程中的数据库信息。

可用性 #

限流 #

如果高并发地消费事件会对下游服务造成压力,那么可以考虑限流。

watermill中提供了限流中间件:

router, _ := message.NewRouter(message.RouterConfig{}, logger)
router.AddMiddleware(middleware.NewThrottle(10, time.Second).Middleware) // 允许一秒最多处理10个消息

熔断 #

熔断也是常用的微服务治理工具:当下游服务出现大量报错后,身为上游服务不应再给予下游服务大量压力,而应该等待一段时间后再去访问,以防止整个系统雪崩。

watermill中使用gobreaker作为其熔断中间件的实现:

import (
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/sony/gobreaker"
  "github.com/ThreeDotsLabs/watermill/message"
)


router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
  return err
}
router.AddMiddleware(middleware.NewCircuitBreaker(gobreaker.Settings{
  Name:        "breaker",
  MaxRequests: 1,
  Timeout:     time.Second,
}).Middleware)

死信队列 #

如果一个消息被损坏而不能正常消费,这时候就要把它放入“死信队列”,以待更进一步的检查。

watermill中提供了该功能的中间件:

router.AddMiddleware(middleware.PoisonQueue(publisher, "poison_queue"))

设置了上述中间件后,如果消息在消费过程中遇到了错误,并且返回了错误,那么这个消息会被ACK,并发送到poison_queue中。

上面的中间件有些过于“粗暴”,因为有些消息在消费过程中可能遇到了临时性的错误,那么NACK后过一段时间再次消费即可。这时候可以使用过滤器:

pq, err := middleware.PoisonQueueWithFilter(pub, "PoisonQueue", func(err error) bool {
	var permErr PermanentError
	if errors.As(err, &permErr) && permErr.IsPermanent() {
		return true
	}

	return false
})
router.AddMiddleware(pq)

当然,也可以搭配Retry中间件使用——重试几次后依然报错,则放入死信队列:

router.AddMiddleware(
	middleware.PoisonQueue(publisher, "poison_queue"),
	middleware.Retry{
		// Config
	}.Middleware,
)

watermill会将一些元数据放到header中,这些key为:

  • middleware.ReasonForPoisonedKey: 消息被放到死信队列的原因,通常是报错信息
  • middleware.PoisonedTopicKey: 消息被放到死信队列前的队列名称
  • middleware.PoisonedHandlerKey:消息被放到死信队列前的Handler名称
  • middleware.PoisonedSubscriberKey:消息被放到死信队列前的订阅者的名称

使用方式如:reason := msg.Metadata.Get(middleware.ReasonForPoisonedKey)