Documentation
¶
Index ¶
- Constants
- Variables
- func DialContext(ctx context.Context, _ string, _ string) (net.Conn, error)
- func DialH2CContext(ctx context.Context, _, _ string, _ *tls.Config) (net.Conn, error)
- func NewClient() *http.Client
- func NewCustomClient(node uint64, realm string, service string) *http.Client
- type API
- func (a API) CreateSubscription(filename string, topic string, cursor EventCursor) (*Subscription, error)
- func (a API) Emit(ctx context.Context, req EmitRequest) error
- func (a API) Listen(ctx context.Context, topic string) (<-chan *Event, error)
- func (a API) NextNamedSequence(ctx context.Context, name string) (uint64, error)
- func (a API) NextSequence(ctx context.Context) (uint64, error)
- func (a API) OpenSubscription(topic string, filename string) (*Subscription, error)
- func (a API) Peers(ctx context.Context, service string) (NodeList, error)
- func (a API) Publish(ctx context.Context, req PublishRequest) ([]monotime.UUID, error)
- func (a API) Resolve(ctx context.Context, service string) (NodeList, error)
- func (a API) State(ctx context.Context) (*NodeState, error)
- func (a API) Subscribe(ctx context.Context, req SubscribeRequest, handler func(*Event) (bool, error)) error
- type EmitRequest
- type Environment
- type Event
- type EventCursor
- func (ec EventCursor) Clone() EventCursor
- func (ec EventCursor) ExtractNode(id uint64) monotime.UUID
- func (ec EventCursor) Merge(cursor EventCursor) EventCursor
- func (ec EventCursor) Nodes() []uint64
- func (ec EventCursor) Set(uuid monotime.UUID) EventCursor
- func (ec EventCursor) Update(uuid monotime.UUID) EventCursor
- type EventsRequest
- type ListenMap
- type ListenRequest
- type LogRecord
- type LogsRequest
- type MetaField
- type MetaFilter
- type Node
- type NodeList
- type NodeMap
- type NodeState
- func (ns *NodeState) Clone() *NodeState
- func (ns *NodeState) Filter(fn func(n *Node) bool) NodeList
- func (ns *NodeState) GetNode(id uint64) *Node
- func (ns *NodeState) NodesByProximity() NodeList
- func (ns *NodeState) Peers(service string) NodeList
- func (ns *NodeState) Resolve(service string) NodeList
- type PublishMap
- type PublishRequest
- type RoundTripper
- type Service
- type Services
- type SubscribeRequest
- type Subscription
Constants ¶
const ( EnvMode = "MESS_MODE" EnvNodeID = "MESS_NODE_ID" EnvRealm = "MESS_REALM" EnvService = "MESS_SERVICE" EnvAlias = "MESS_ALIAS" EnvDataDir = "MESS_DATA_DIR" EnvProxy = "MESS_PROXY" )
const ( TargetNodeHeader = "X-Mess-Target-Node" TargetRealmHeader = "X-Mess-Target-Realm" TargetServiceHeader = "X-Mess-Target-Service" CallerHeader = "X-Mess-Caller" )
const MaxEventSize = 16 << 20
const PublicPort = 2701
const ServiceName = "mess"
Variables ¶
var ( ErrSubscriptionExists = errors.New("subscription already exists") ErrSubscriptionNotExist = errors.New("subscription does not exist") )
var ( ErrInterrupt = errors.New("system interrupt") ErrInvalidNode = errors.New("invalid node") ErrNoCertProvided = errors.New("no certificates provided") ErrInternalEndpoint = errors.New("internal endpoint") )
var DefaultAPI = API{Client: NewClient()}
var DefaultClient = NewClient()
Functions ¶
func DialH2CContext ¶
Types ¶
type API ¶
API provides methods to interact with the mess.
func (API) CreateSubscription ¶
func (a API) CreateSubscription(filename string, topic string, cursor EventCursor) (*Subscription, error)
CreateSubscription creates a persistent Subscription to the provided topic. Subscription automatically manages cursor persistence and resumes processing from where it left off.
The provided cursor is used as a starting position (offset). If cursor is nil or empty, the subscription starts from the most recent events.
If the file exists, an error is returned; OpenSubscription should be used to open an existing one.
func (API) NextNamedSequence ¶
NextNamedSequence returns the next sequential identifier associated with the specified name, which is globally unique across all cluster nodes. It consists of a 16-bit node ID and a 48-bit counter.
func (API) NextSequence ¶
NextSequence returns the next sequential ID, which is globally unique across all nodes in the cluster. It consists of a 16-bit node ID and a 48-bit counter.
func (API) OpenSubscription ¶
func (a API) OpenSubscription(topic string, filename string) (*Subscription, error)
OpenSubscription opens a previously created persistent Subscription. Subscription automatically manages cursor persistence and resumes processing from where it left off.
If the file does not exist, an error is returned; CreateSubscription should be used to create a new one.
func (API) Peers ¶ added in v0.0.4
Peers returns a list of nodes with services matching the specified service name. The current service (the one making the call) is excluded. Only services in the current realm are considered. Services are returned regardless of their current availability or runtime state.
func (API) Publish ¶
Publish publishes the provided events on the specified topic. Meta fields, if specified, will be stored with each event. Publish guarantees that either all the provided events are stored or none. If a nil error is returned, all events are successfully published and the returned slice contains the IDs of the stored events.
func (API) Resolve ¶ added in v0.0.5
Resolve returns a list of node with services matching the specified service name and available at the time of the call. Only services in the current realm are considered.
func (API) Subscribe ¶
func (a API) Subscribe(ctx context.Context, req SubscribeRequest, handler func(*Event) (bool, error)) error
Subscribe receives events from the specified topic using the provided cursor as an offset. If the cursor is nil or empty, the subscription starts from the most recent events. An optional filter is used for server-side filtering of incoming events.
The handler is called for each event. If the handler returns false or a non-nil error, processing stops and the error is returned. The event is not considered handled and will be retried later.
Subscribe transparently reconnects with exponential backoff if needed. Connection errors do not stop processing; they are logged using the standard log package.
If the context is canceled or expires, Subscribe waits until the currently running handler returns, then stops processing and returns the context error.
Subscribe blocks until the context is canceled or expires, or the handler returns false or a non-nil error.
Subscribe does not provide cursor persistence; use CreateSubscription for that.
type EmitRequest ¶
type EmitRequest struct {
Topic string `json:"topic"`
Meta []MetaField `json:"meta"`
Events []json.RawMessage `json:"events"`
}
type Environment ¶
type Environment struct {
// Dev is true when EnvMode is "dev" or not set.
Dev bool
// Mode holds the value of the EnvMode variable.
Mode string
// NodeID is the unique identifier of the mess node, taken from EnvNodeID.
NodeID uint64
// Service holds the service name from EnvService.
Service string
// Alias holds service aliases from EnvAlias.
Alias []string
// Realm holds the realm (namespace) name from EnvRealm.
Realm string
// DataDir points to the persistent directory for service data, taken from EnvDataDir.
DataDir string
// Proxy configuration (port os socket) from EnvProxy.
Proxy string
// ProxyNetwork is a helper field that holds the network type for net.Listen.
// Typically, this field should not be used directly; prefer functions such as
// NewClient, NewTransport, NewCustomClient, or NewCustomTransport instead.
ProxyNetwork string
// ProxyAddr is a helper field that holds the address for net.Listen.
// Typically, this field should not be used directly; prefer functions such as
// NewClient, NewTransport, NewCustomClient, or NewCustomTransport instead.
ProxyAddr string
}
func Env ¶
func Env() Environment
type Event ¶
type Event struct {
// ID is the unique event identifier in UUID v7 format. Set by the node.
// It contains the event time and the ID of the node that published the event.
// This ID is guaranteed to be unique within the cluster.
ID monotime.UUID `json:"id"`
// Topic on which the event was published. Required.
Topic string `json:"topic"`
// Meta holds user-provided metadata.
Meta []MetaField `json:"meta,omitempty"`
// Data is the event payload.
Data json.RawMessage `json:"data"`
}
type EventCursor ¶
EventCursor is a vector clock that represents a per-node event position vector based on monotime UUIDs.
Each entry in the cursor stores the last observed event UUID for a specific node. The node identity is encoded inside monotime.UUID.
At most one UUID is stored per node. For each node, the UUID represents the latest known event for that node.
func (EventCursor) Clone ¶ added in v0.0.7
func (ec EventCursor) Clone() EventCursor
Clone returns a copy of the cursor.
func (EventCursor) ExtractNode ¶
func (ec EventCursor) ExtractNode(id uint64) monotime.UUID
ExtractNode returns the event ID associated with the given node ID. If the node is not present in the cursor, ExtractNode returns monotime.ZeroUUID.
func (EventCursor) Merge ¶ added in v0.0.7
func (ec EventCursor) Merge(cursor EventCursor) EventCursor
Merge merges another cursor into the current.
For each node, the resulting cursor contains the newest event ID. If a node exists in both cursors, Update semantics are applied. Nodes present only in the provided cursor are appended.
Merge updates the cursor in-place. The returned cursor shares the same underlying array.
func (EventCursor) Nodes ¶ added in v0.0.7
func (ec EventCursor) Nodes() []uint64
Nodes returns the list of node IDs present in the cursor.
func (EventCursor) Set ¶ added in v0.0.7
func (ec EventCursor) Set(uuid monotime.UUID) EventCursor
Set overwrites the cursor entry for the node encoded in the uuid. If the node is not present, the provided uuid is appended.
Set updates the cursor in-place. The returned cursor shares the same underlying array.
func (EventCursor) Update ¶
func (ec EventCursor) Update(uuid monotime.UUID) EventCursor
Update updates the cursor entry for the node encoded in the provided uuid if it is newer than the existing value associated with that node. If the node is not yet present in the cursor, uuid is appended.
Update updates the cursor in-place. The returned cursor shares the same underlying array.
type EventsRequest ¶
type ListenRequest ¶
type ListenRequest struct {
Topic string
}
type LogRecord ¶
type LogRecord struct {
ID int64 `json:"id"`
Data json.RawMessage `json:"data"`
}
type LogsRequest ¶
type MetaFilter ¶
type Node ¶
type Node struct {
ID uint64 `json:"id"` // node id in the mess
Region string `json:"region"` // node location: region
Country string `json:"country"` // node location: country
Datacenter string `json:"datacenter"` // node location: datacenter
Addr string `json:"addr"` // node address
Bind string `json:"bind"` // bind interface (if set)
CertExpires int64 `json:"certExpires"` // certificate expiration date (unixtime)
Services Services `json:"services"` // registered services
LastSync int64 `json:"lastSync"` // last sync time (unixtime)
Publish PublishMap `json:"publish"` // topics published by services running on this node
Listen ListenMap `json:"listen"` // topics that services running on this node listen to
Meta map[string]string `json:"meta"` // any additional fields
}
func (*Node) ProximitySort ¶
type NodeList ¶
type NodeList []*Node
type NodeMap ¶
func (NodeMap) Filter ¶
Filter returns a NodeList holding nodes for which the provided fn returns true.
type NodeState ¶
type NodeState struct {
// Node is the current state of the requested node.
Node *Node `json:"node,omitempty"`
// Map is a map of all known nodes in the cluster.
Map NodeMap `json:"map"`
}
NodeState holds information about the node and the cluster.
func (*NodeState) Filter ¶
Filter returns a NodeList holding nodes for which the provided fn returns true. The resulting slice is ordered by proximity to the current node (closest to farthest).
func (*NodeState) NodesByProximity ¶
NodesByProximity returns a NodeList ordered by proximity to the current node, including the current node itself as the first element. If NodeState does not hold a valid Node, nil is returned.
func (*NodeState) Peers ¶ added in v0.0.4
Peers returns a list of nodes with services matching the specified service name. The current service (the one from the environment) is excluded. Only services in the current realm are considered. Services are returned regardless of their current availability or runtime state.
type PublishMap ¶
func (PublishMap) Clone ¶
func (pm PublishMap) Clone() PublishMap
func (PublishMap) Has ¶
func (pm PublishMap) Has(realm, topic string) bool
type PublishRequest ¶
type PublishRequest struct {
Topic string `json:"topic"`
Meta []MetaField `json:"meta"`
Events []json.RawMessage `json:"events"`
}
type RoundTripper ¶
type RoundTripper struct {
Realm string // specific realm
Node uint64 // single target node
Service string // single target service
http.RoundTripper
}
func NewCustomTransport ¶
func NewCustomTransport(node uint64, realm string, service string) RoundTripper
NewCustomTransport creates a custom transport that can target specific realm, node and services. Empty realm is treated as the current realm. Since mess proxy supports h2c, NewCustomTransport creates an http2.Transport.
func NewTransport ¶
func NewTransport() RoundTripper
type Service ¶
type Service struct {
Name string `json:"name"` // service name
Alias []string `json:"alias"` // service aliases
Realm string `json:"realm"` // service realm (namespace)
Manual bool `json:"manual"` // no auto-start, no auto-restart
Active bool `json:"active"` // is running
Private bool `json:"private"` // no incoming routing, no resolve
Start string `json:"start"` // binary to start
Order int `json:"order"` // start order
Args []string `json:"args"` // command-line arguments
Env []string `json:"env"` // env variables
Listen string `json:"listen"` // listens HTTP on (127.0.0.1:8080 or /path/to/my.sock)
Proxy string `json:"proxy"` // on production: proxy network ("tcp" or "unix"); on dev - port or socket file
Timeout int `json:"timeout"` // shutdown timeout (to wait before kill, zero means wait indefinitely)
Meta map[string]string `json:"meta"` // any additional fields
}
type SubscribeRequest ¶
type SubscribeRequest struct {
Topic string `json:"topic"`
Cursor EventCursor `json:"cursor"`
Filter []MetaFilter `json:"filter"`
}
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription represents a persistent event subscription on a single topic.
func (*Subscription) Close ¶
func (s *Subscription) Close() error
Close closes the underlying cursor store. Close blocks until any active Receive calls return. Close must not be called inside a handler provided to Receive.
func (*Subscription) Cursor ¶
func (s *Subscription) Cursor() EventCursor
Cursor returns a snapshot of the current global position of the subscription. The returned cursor can be used on any node to resume event processing from the current position of this subscription.
func (*Subscription) Receive ¶
func (s *Subscription) Receive(ctx context.Context, filter []MetaFilter, handler func(*Event) (bool, error)) (err error)
Receive dispatches new events to the provided handler. The optional filter is used for server-side filtering of incoming events.
Receive blocks until one of the following happens:
- the handler returns false, indicating that processing should stop;
- the handler returns a non-nil error, which is then returned by Receive;
- the context is canceled or reaches its deadline.
Any panic in the handler is recovered and returned as an error.
Subsequent calls to Receive on the same Subscription block until the previous call has returned.
func (*Subscription) Topic ¶
func (s *Subscription) Topic() string
Topic returns the subscription topic.
func (*Subscription) Update ¶ added in v0.0.7
func (s *Subscription) Update(uuid monotime.UUID) error
Update updates the cursor entry for the node encoded in the provided uuid if it is newer than the existing value associated with that node. If the node is not yet present in the cursor, uuid is appended.
This operation cannot be rolled back and must be used with caution.
It does not interrupt any active handlers and does not shift their positions.