mess

package module
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

README

mess

An early prototype of a process manager, service mesh, and event bus.

The main goal is to provide a simple and reliable foundation for running services without containers on bare-metal servers, while having some basics out of the box.

Those basics are currently:

  • Node and cluster manager with an API (certificate rotation, gossip, logging)
  • Process manager with an API (control, deployment, logging)
  • Service mesh with TLS and automatic routing of HTTP traffic
  • Event bus, durable and ephemeral messaging

None of the above is production ready

Planned:

  • Optional health checks
  • Metrics collection
  • Security audit
  • Distributed config
  • Pass additional streams to the binary (on Linux)

Documentation

Index

Constants

View Source
const (
	EnvMode    = "MESS_MODE"
	EnvNodeID  = "MESS_NODE_ID"
	EnvRealm   = "MESS_REALM"
	EnvService = "MESS_SERVICE"
	EnvAlias   = "MESS_ALIAS"
	EnvDataDir = "MESS_DATA_DIR"
	EnvProxy   = "MESS_PROXY"
)
View Source
const (
	TargetNodeHeader    = "X-Mess-Target-Node"
	TargetRealmHeader   = "X-Mess-Target-Realm"
	TargetServiceHeader = "X-Mess-Target-Service"

	CallerHeader = "X-Mess-Caller"
)
View Source
const MaxEventSize = 16 << 20
View Source
const PublicPort = 2701
View Source
const ServiceName = "mess"

Variables

View Source
var (
	ErrSubscriptionExists   = errors.New("subscription already exists")
	ErrSubscriptionNotExist = errors.New("subscription does not exist")
)
View Source
var (
	ErrInterrupt        = errors.New("system interrupt")
	ErrInvalidNode      = errors.New("invalid node")
	ErrNoCertProvided   = errors.New("no certificates provided")
	ErrInternalEndpoint = errors.New("internal endpoint")
)
View Source
var DefaultAPI = API{Client: NewClient()}
View Source
var DefaultClient = NewClient()

Functions

func DialContext

func DialContext(ctx context.Context, _ string, _ string) (net.Conn, error)

func DialH2CContext

func DialH2CContext(ctx context.Context, _, _ string, _ *tls.Config) (net.Conn, error)

func NewClient

func NewClient() *http.Client

func NewCustomClient

func NewCustomClient(node uint64, realm string, service string) *http.Client

NewCustomClient creates a client targeting specific realm, node and/or service. Empty realm is treated as the current realm. Zero node and empty service are ignored.

Types

type API

type API struct {
	Client *http.Client
}

API provides methods to interact with the mess.

func (API) CreateSubscription

func (a API) CreateSubscription(filename string, topic string, cursor EventCursor) (*Subscription, error)

CreateSubscription creates a persistent Subscription to the provided topic. Subscription automatically manages cursor persistence and resumes processing from where it left off.

The provided cursor is used as a starting position (offset). If cursor is nil or empty, the subscription starts from the most recent events.

If the file exists, an error is returned; OpenSubscription should be used to open an existing one.

func (API) Emit

func (a API) Emit(ctx context.Context, req EmitRequest) error

func (API) Listen

func (a API) Listen(ctx context.Context, topic string) (<-chan *Event, error)

func (API) NextNamedSequence

func (a API) NextNamedSequence(ctx context.Context, name string) (uint64, error)

NextNamedSequence returns the next sequential identifier associated with the specified name, which is globally unique across all cluster nodes. It consists of a 16-bit node ID and a 48-bit counter.

func (API) NextSequence

func (a API) NextSequence(ctx context.Context) (uint64, error)

NextSequence returns the next sequential ID, which is globally unique across all nodes in the cluster. It consists of a 16-bit node ID and a 48-bit counter.

func (API) OpenSubscription

func (a API) OpenSubscription(topic string, filename string) (*Subscription, error)

OpenSubscription opens a previously created persistent Subscription. Subscription automatically manages cursor persistence and resumes processing from where it left off.

If the file does not exist, an error is returned; CreateSubscription should be used to create a new one.

func (API) Peers added in v0.0.4

func (a API) Peers(ctx context.Context, service string) (NodeList, error)

Peers returns a list of nodes with services matching the specified service name. The current service (the one making the call) is excluded. Only services in the current realm are considered. Services are returned regardless of their current availability or runtime state.

func (API) Publish

func (a API) Publish(ctx context.Context, req PublishRequest) ([]monotime.UUID, error)

Publish publishes the provided events on the specified topic. Meta fields, if specified, will be stored with each event. Publish guarantees that either all the provided events are stored or none. If a nil error is returned, all events are successfully published and the returned slice contains the IDs of the stored events.

func (API) Resolve added in v0.0.5

func (a API) Resolve(ctx context.Context, service string) (NodeList, error)

Resolve returns a list of node with services matching the specified service name and available at the time of the call. Only services in the current realm are considered.

func (API) State

func (a API) State(ctx context.Context) (*NodeState, error)

State returns a state of the current node along with a map of the cluster.

func (API) Subscribe

func (a API) Subscribe(ctx context.Context, req SubscribeRequest, handler func(*Event) (bool, error)) error

Subscribe receives events from the specified topic using the provided cursor as an offset. If the cursor is nil or empty, the subscription starts from the most recent events. An optional filter is used for server-side filtering of incoming events.

The handler is called for each event. If the handler returns false or a non-nil error, processing stops and the error is returned. The event is not considered handled and will be retried later.

Subscribe transparently reconnects with exponential backoff if needed. Connection errors do not stop processing; they are logged using the standard log package.

If the context is canceled or expires, Subscribe waits until the currently running handler returns, then stops processing and returns the context error.

Subscribe blocks until the context is canceled or expires, or the handler returns false or a non-nil error.

Subscribe does not provide cursor persistence; use CreateSubscription for that.

type EmitRequest

type EmitRequest struct {
	Topic  string            `json:"topic"`
	Meta   []MetaField       `json:"meta"`
	Events []json.RawMessage `json:"events"`
}

type Environment

type Environment struct {
	// Dev is true when EnvMode is "dev" or not set.
	Dev bool
	// Mode holds the value of the EnvMode variable.
	Mode string
	// NodeID is the unique identifier of the mess node, taken from EnvNodeID.
	NodeID uint64
	// Service holds the service name from EnvService.
	Service string
	// Alias holds service aliases from EnvAlias.
	Alias []string
	// Realm holds the realm (namespace) name from EnvRealm.
	Realm string
	// DataDir points to the persistent directory for service data, taken from EnvDataDir.
	DataDir string
	// Proxy configuration (port os socket) from EnvProxy.
	Proxy string

	// ProxyNetwork is a helper field that holds the network type for net.Listen.
	// Typically, this field should not be used directly; prefer functions such as
	// NewClient, NewTransport, NewCustomClient, or NewCustomTransport instead.
	ProxyNetwork string
	// ProxyAddr is a helper field that holds the address for net.Listen.
	// Typically, this field should not be used directly; prefer functions such as
	// NewClient, NewTransport, NewCustomClient, or NewCustomTransport instead.
	ProxyAddr string
}

func Env

func Env() Environment

type Event

type Event struct {

	// ID is the unique event identifier in UUID v7 format. Set by the node.
	// It contains the event time and the ID of the node that published the event.
	// This ID is guaranteed to be unique within the cluster.
	ID monotime.UUID `json:"id"`

	// Topic on which the event was published. Required.
	Topic string `json:"topic"`

	// Meta holds user-provided metadata.
	Meta []MetaField `json:"meta,omitempty"`

	// Data is the event payload.
	Data json.RawMessage `json:"data"`
}

type EventCursor

type EventCursor []monotime.UUID

EventCursor is a vector clock that represents a per-node event position vector based on monotime UUIDs.

Each entry in the cursor stores the last observed event UUID for a specific node. The node identity is encoded inside monotime.UUID.

At most one UUID is stored per node. For each node, the UUID represents the latest known event for that node.

func (EventCursor) Clone added in v0.0.7

func (ec EventCursor) Clone() EventCursor

Clone returns a copy of the cursor.

func (EventCursor) ExtractNode

func (ec EventCursor) ExtractNode(id uint64) monotime.UUID

ExtractNode returns the event ID associated with the given node ID. If the node is not present in the cursor, ExtractNode returns monotime.ZeroUUID.

func (EventCursor) Merge added in v0.0.7

func (ec EventCursor) Merge(cursor EventCursor) EventCursor

Merge merges another cursor into the current.

For each node, the resulting cursor contains the newest event ID. If a node exists in both cursors, Update semantics are applied. Nodes present only in the provided cursor are appended.

Merge updates the cursor in-place. The returned cursor shares the same underlying array.

func (EventCursor) Nodes added in v0.0.7

func (ec EventCursor) Nodes() []uint64

Nodes returns the list of node IDs present in the cursor.

func (EventCursor) Set added in v0.0.7

func (ec EventCursor) Set(uuid monotime.UUID) EventCursor

Set overwrites the cursor entry for the node encoded in the uuid. If the node is not present, the provided uuid is appended.

Set updates the cursor in-place. The returned cursor shares the same underlying array.

func (EventCursor) Update

func (ec EventCursor) Update(uuid monotime.UUID) EventCursor

Update updates the cursor entry for the node encoded in the provided uuid if it is newer than the existing value associated with that node. If the node is not yet present in the cursor, uuid is appended.

Update updates the cursor in-place. The returned cursor shares the same underlying array.

type EventsRequest

type EventsRequest struct {
	Realm  string        `json:"realm"`
	Topic  string        `json:"topic"`
	Offset monotime.UUID `json:"offset"`
	Limit  uint64        `json:"limit"`
	Filter []MetaFilter  `json:"filter"`
	Stream bool          `json:"stream"`
}

type ListenMap

type ListenMap map[string][]string

func (ListenMap) Clone

func (cm ListenMap) Clone() ListenMap

func (ListenMap) Has

func (cm ListenMap) Has(realm, topic string) bool

type ListenRequest

type ListenRequest struct {
	Topic string
}

type LogRecord

type LogRecord struct {
	ID   int64           `json:"id"`
	Data json.RawMessage `json:"data"`
}

type LogsRequest

type LogsRequest struct {
	Realm   string `json:"realm"`
	Service string `json:"service"`
	Offset  int64  `json:"offset"`
	Limit   uint64 `json:"limit"`
	Stream  bool   `json:"stream"`
}

type MetaField

type MetaField struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

type MetaFilter

type MetaFilter struct {
	Key string   `json:"key"`
	Any []string `json:"any"`
	Not []string `json:"not"`
}

type Node

type Node struct {
	ID          uint64     `json:"id"`          // node id in the mess
	Region      string     `json:"region"`      // node location: region
	Country     string     `json:"country"`     // node location: country
	Datacenter  string     `json:"datacenter"`  // node location: datacenter
	Addr        string     `json:"addr"`        // node address
	Bind        string     `json:"bind"`        // bind interface (if set)
	CertExpires int64      `json:"certExpires"` // certificate expiration date (unixtime)
	Services    Services   `json:"services"`    // registered services
	LastSync    int64      `json:"lastSync"`    // last sync time (unixtime)
	Publish     PublishMap `json:"publish"`     // topics published by services running on this node
	Listen      ListenMap  `json:"listen"`      // topics that services running on this node listen to

	Meta map[string]string `json:"meta"` // any additional fields
}

func (*Node) Address

func (n *Node) Address() string

func (*Node) Clone

func (n *Node) Clone() *Node

func (*Node) Location

func (n *Node) Location() string

func (*Node) ProximitySort

func (n *Node) ProximitySort(a, b *Node) int

type NodeList

type NodeList []*Node

func (NodeList) Clone added in v0.0.4

func (nl NodeList) Clone() NodeList

Clone returns a deep copy of the NodeList.

func (NodeList) Filter

func (nl NodeList) Filter(fn func(n *Node) bool) NodeList

Filter returns a NodeList holding nodes for which the provided fn returns true.

type NodeMap

type NodeMap map[uint64]*Node

func (NodeMap) Clone

func (nm NodeMap) Clone() NodeMap

func (NodeMap) Filter

func (nm NodeMap) Filter(fn func(n *Node) bool) NodeList

Filter returns a NodeList holding nodes for which the provided fn returns true.

func (NodeMap) Get

func (nm NodeMap) Get(id uint64) *Node

Get returns a Node by ID or nil if not found.

func (NodeMap) List

func (nm NodeMap) List() NodeList

List returns a NodeList holding all nodes from the map.

type NodeState

type NodeState struct {
	// Node is the current state of the requested node.
	Node *Node `json:"node,omitempty"`
	// Map is a map of all known nodes in the cluster.
	Map NodeMap `json:"map"`
}

NodeState holds information about the node and the cluster.

func (*NodeState) Clone

func (ns *NodeState) Clone() *NodeState

func (*NodeState) Filter

func (ns *NodeState) Filter(fn func(n *Node) bool) NodeList

Filter returns a NodeList holding nodes for which the provided fn returns true. The resulting slice is ordered by proximity to the current node (closest to farthest).

func (*NodeState) GetNode

func (ns *NodeState) GetNode(id uint64) *Node

func (*NodeState) NodesByProximity

func (ns *NodeState) NodesByProximity() NodeList

NodesByProximity returns a NodeList ordered by proximity to the current node, including the current node itself as the first element. If NodeState does not hold a valid Node, nil is returned.

func (*NodeState) Peers added in v0.0.4

func (ns *NodeState) Peers(service string) NodeList

Peers returns a list of nodes with services matching the specified service name. The current service (the one from the environment) is excluded. Only services in the current realm are considered. Services are returned regardless of their current availability or runtime state.

func (*NodeState) Resolve added in v0.0.5

func (ns *NodeState) Resolve(service string) NodeList

Resolve returns a list of node with services matching the specified service name and available at the time of the call. Only services in the current realm are considered.

type PublishMap

type PublishMap map[string]map[string]monotime.UUID

func (PublishMap) Clone

func (pm PublishMap) Clone() PublishMap

func (PublishMap) Has

func (pm PublishMap) Has(realm, topic string) bool

type PublishRequest

type PublishRequest struct {
	Topic  string            `json:"topic"`
	Meta   []MetaField       `json:"meta"`
	Events []json.RawMessage `json:"events"`
}

type RoundTripper

type RoundTripper struct {
	Realm   string // specific realm
	Node    uint64 // single target node
	Service string // single target service
	http.RoundTripper
}

func NewCustomTransport

func NewCustomTransport(node uint64, realm string, service string) RoundTripper

NewCustomTransport creates a custom transport that can target specific realm, node and services. Empty realm is treated as the current realm. Since mess proxy supports h2c, NewCustomTransport creates an http2.Transport.

func NewTransport

func NewTransport() RoundTripper

func (RoundTripper) RoundTrip

func (r RoundTripper) RoundTrip(request *http.Request) (*http.Response, error)

type Service

type Service struct {
	Name    string   `json:"name"`    // service name
	Alias   []string `json:"alias"`   // service aliases
	Realm   string   `json:"realm"`   // service realm (namespace)
	Manual  bool     `json:"manual"`  // no auto-start, no auto-restart
	Active  bool     `json:"active"`  // is running
	Private bool     `json:"private"` // no incoming routing, no resolve
	Start   string   `json:"start"`   // binary to start
	Order   int      `json:"order"`   // start order
	Args    []string `json:"args"`    // command-line arguments
	Env     []string `json:"env"`     // env variables
	Listen  string   `json:"listen"`  // listens HTTP on (127.0.0.1:8080 or /path/to/my.sock)
	Proxy   string   `json:"proxy"`   // on production: proxy network ("tcp" or "unix"); on dev - port or socket file
	Timeout int      `json:"timeout"` // shutdown timeout (to wait before kill, zero means wait indefinitely)

	Meta map[string]string `json:"meta"` // any additional fields
}

func (*Service) Clone

func (s *Service) Clone() *Service

func (*Service) HasAlias added in v0.0.5

func (s *Service) HasAlias(alias string) bool

type Services

type Services []*Service

func (Services) Clone

func (s Services) Clone() Services

func (Services) Filter added in v0.0.4

func (s Services) Filter(fn func(*Service) bool) Services

type SubscribeRequest

type SubscribeRequest struct {
	Topic  string       `json:"topic"`
	Cursor EventCursor  `json:"cursor"`
	Filter []MetaFilter `json:"filter"`
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription represents a persistent event subscription on a single topic.

func (*Subscription) Close

func (s *Subscription) Close() error

Close closes the underlying cursor store. Close blocks until any active Receive calls return. Close must not be called inside a handler provided to Receive.

func (*Subscription) Cursor

func (s *Subscription) Cursor() EventCursor

Cursor returns a snapshot of the current global position of the subscription. The returned cursor can be used on any node to resume event processing from the current position of this subscription.

func (*Subscription) Receive

func (s *Subscription) Receive(ctx context.Context, filter []MetaFilter, handler func(*Event) (bool, error)) (err error)

Receive dispatches new events to the provided handler. The optional filter is used for server-side filtering of incoming events.

Receive blocks until one of the following happens:

  • the handler returns false, indicating that processing should stop;
  • the handler returns a non-nil error, which is then returned by Receive;
  • the context is canceled or reaches its deadline.

Any panic in the handler is recovered and returned as an error.

Subsequent calls to Receive on the same Subscription block until the previous call has returned.

func (*Subscription) Topic

func (s *Subscription) Topic() string

Topic returns the subscription topic.

func (*Subscription) Update added in v0.0.7

func (s *Subscription) Update(uuid monotime.UUID) error

Update updates the cursor entry for the node encoded in the provided uuid if it is newer than the existing value associated with that node. If the node is not yet present in the cursor, uuid is appended.

This operation cannot be rolled back and must be used with caution.

It does not interrupt any active handlers and does not shift their positions.

Directories

Path Synopsis
cmd
mess command
node command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL