ingestion

package
v0.0.0-...-b6c7913 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2025 License: MIT Imports: 40 Imported by: 0

README

Ingestion Package

The ingestion package implements a comprehensive video stream ingestion system for the Mirror platform. It supports multiple protocols (SRT, RTP), automatic codec detection, intelligent buffering, and advanced video processing capabilities.

Table of Contents

Overview

The ingestion system provides:

  • Multi-protocol support (SRT, RTP) with unified interface
  • Automatic codec detection (H.264, HEVC, AV1, JPEG-XS)
  • Video-aware buffering with GOP management
  • Frame-perfect synchronization for A/V alignment
  • Intelligent backpressure control
  • Automatic recovery and reconnection
  • Comprehensive metrics and monitoring

Architecture

System Architecture
┌─────────────────┐     ┌─────────────────┐
│   SRT Client    │     │   RTP Client    │
└────────┬────────┘     └────────┬────────┘
         │                       │
    ┌────▼────────┐         ┌───▼─────────┐
    │ SRT Listener│         │ RTP Listener│
    └────┬────────┘         └───┬─────────┘
         │                      │
    ┌────▼──────────────────────▼────────┐
    │      Connection Adapter Layer      │
    └────────────────┬───────────────────┘
                     │
    ┌────────────────▼───────────────────┐
    │      Stream Handler Manager        │
    └────────────────┬───────────────────┘
                     │
    ┌────────────────▼───────────────────┐
    │      Video Processing Pipeline     │
    ├────────────────────────────────────┤
    │ • Codec Detection                  │
    │ • Frame Assembly                   │
    │ • GOP Buffering                    │
    │ • A/V Synchronization              │
    └────────────────┬───────────────────┘
                     │
    ┌────────────────▼───────────────────┐
    │         Output Queue               │
    └────────────────────────────────────┘
Component Overview
  • Manager: Central orchestrator for all ingestion operations
  • Protocol Listeners: SRT and RTP protocol implementations
  • Stream Handler: Manages individual stream processing
  • Video Pipeline: Processes video through multiple stages
  • Buffer System: Efficient memory management with backpressure
  • Registry: Tracks active streams with Redis backend

Features

Stream Management
  • Automatic stream detection from protocol metadata
  • Connection limiting with configurable maximums
  • Stream lifecycle management (create, pause, resume, terminate)
  • Health monitoring with heartbeat tracking
  • Graceful shutdown with stream draining
Video Processing
  • Codec-agnostic design with pluggable depacketizers
  • Frame boundary detection for clean cuts
  • GOP structure preservation for quality switching
  • B-frame reordering support
  • Timestamp management with wraparound handling
Reliability
  • Automatic reconnection with exponential backoff
  • Circuit breaker pattern for failing streams
  • Memory protection with global and per-stream limits
  • Packet loss recovery for RTP streams
  • Graceful degradation under load

Protocols

SRT (Secure Reliable Transport)
// SRT configuration
type SRTConfig struct {
    Port            int           // Listen port (default: 30000)
    Latency         time.Duration // Target latency (default: 120ms)
    MaxBandwidth    int64         // Max bandwidth per stream
    PayloadSize     int           // MTU-friendly packet size
    FlowWindow      int           // Flow control window
    PeerIdleTimeout time.Duration // Disconnect idle peers
}

// Usage example
srtListener := srt.NewListener(srtConfig, registry, bufferPool, logger)
if err := srtListener.Start(); err != nil {
    log.Fatal("Failed to start SRT listener:", err)
}
SRT Features
  • Built-in encryption with passphrase support
  • Bandwidth estimation and congestion control
  • Stream ID routing for multi-stream support
  • Low latency mode for live streaming
  • Connection statistics and monitoring
RTP (Real-time Transport Protocol)
// RTP configuration
type RTPConfig struct {
    Port         int           // RTP port (default: 5004)
    RTCPPort     int           // RTCP port (default: 5005)
    BufferSize   int           // UDP buffer size
    MaxSessions  int           // Max concurrent sessions
    SessionTimeout time.Duration // Idle session timeout
}

// Usage example
rtpListener := rtp.NewListener(rtpConfig, registry, bufferPool, logger)
if err := rtpListener.Start(); err != nil {
    log.Fatal("Failed to start RTP listener:", err)
}
RTP Features
  • SSRC-based stream identification
  • Sequence number validation
  • Jitter buffer management
  • RTCP support (planned)
  • Multi-codec support via payload type

Video Processing Pipeline

Pipeline Stages
// Video pipeline processes data through stages
type VideoPipeline struct {
    codec      codec.Detector
    assembler  frame.Assembler
    detector   frame.Detector
    gopBuffer  gop.Buffer
    syncMgr    sync.Manager
    output     chan<- interface{}
}

// Processing flow
func (p *VideoPipeline) Process(packet Packet) error {
    // 1. Detect codec
    codecType := p.codec.Detect(packet)
    
    // 2. Assemble frame
    frame, complete := p.assembler.AddPacket(packet)
    if !complete {
        return nil // Wait for more packets
    }
    
    // 3. Detect frame boundaries
    frameInfo := p.detector.Analyze(frame)
    
    // 4. Buffer in GOP
    p.gopBuffer.AddFrame(frame, frameInfo)
    
    // 5. Apply A/V sync
    if synced := p.syncMgr.ProcessFrame(frame); synced != nil {
        p.output <- synced
    }
    
    return nil
}
Codec Support
H.264/AVC
// H.264 specific handling
type H264Depacketizer struct {
    // NAL unit assembly
    nalBuffer []byte
    // SPS/PPS tracking
    sps, pps []byte
}

// Features:
// - Annex B and AVCC format support
// - SPS/PPS extraction and caching
// - Access unit boundary detection
// - SEI message parsing
HEVC/H.265
// HEVC specific handling
type HEVCDepacketizer struct {
    // VPS/SPS/PPS tracking
    vps, sps, pps []byte
    // Temporal layer support
    temporalID int
}

// Features:
// - Multi-layer support
// - HDR metadata preservation
// - Temporal scalability
// - Efficient NAL unit handling
AV1
// AV1 specific handling
type AV1Depacketizer struct {
    // OBU parsing
    sequenceHeader []byte
    // Temporal unit assembly
    temporalUnit [][]byte
}

// Features:
// - OBU (Open Bitstream Unit) parsing
// - Scalability support
// - Film grain synthesis data
// - Screen content coding tools

Buffer Management

Ring Buffer System
// Ring buffer for efficient streaming
type RingBuffer struct {
    data     []byte
    size     int64
    writePos int64
    readPos  int64
    
    // Metrics
    drops    int64
    latency  time.Duration
}

// Features:
// - Zero-copy operations where possible
// - Automatic overflow handling
// - Configurable size limits
// - Real-time metrics
Buffer Pool
// Reusable buffer allocation
type BufferPool struct {
    buffers sync.Pool
    size    int
    
    // Metrics
    allocated int64
    reused    int64
}

// Usage
buffer := pool.Get()
defer pool.Put(buffer)

// Write data
n, err := buffer.Write(data)

// Read data
data, err := buffer.Read(size)
Backpressure Control
// Backpressure controller
type BackpressureController struct {
    highWatermark float64 // Trigger backpressure
    lowWatermark  float64 // Release backpressure
    
    // Current state
    pressure     float64
    dropping     bool
}

// Decision making
func (c *BackpressureController) ShouldDrop(priority int) bool {
    if c.pressure > c.highWatermark {
        c.dropping = true
    } else if c.pressure < c.lowWatermark {
        c.dropping = false
    }
    
    // Drop low priority frames first
    return c.dropping && priority < CriticalPriority
}

API Endpoints

Stream Management
List Streams - GET /api/v1/streams
{
  "streams": [
    {
      "id": "stream_123",
      "type": "srt",
      "source_addr": "192.168.1.100:42000",
      "status": "active",
      "video_codec": "hevc",
      "resolution": "1920x1080",
      "bitrate": 5000000,
      "frame_rate": 30.0
    }
  ],
  "count": 1,
  "time": "2024-01-20T15:30:45Z"
}
Get Stream - GET /api/v1/streams/{id}
{
  "id": "stream_123",
  "type": "srt",
  "source_addr": "192.168.1.100:42000",
  "status": "active",
  "created_at": "2024-01-20T15:25:00Z",
  "last_heartbeat": "2024-01-20T15:30:40Z",
  "video_codec": "hevc",
  "resolution": "1920x1080",
  "bitrate": 5000000,
  "frame_rate": 30.0,
  "stats": {
    "bytes_received": 150000000,
    "packets_received": 100000,
    "packets_lost": 5,
    "bitrate": 5000000
  }
}
Stream Control
  • Pause Stream - POST /api/v1/streams/{id}/pause
  • Resume Stream - POST /api/v1/streams/{id}/resume
  • Delete Stream - DELETE /api/v1/streams/{id}
Statistics
System Stats - GET /api/v1/stats
{
  "total_streams": 25,
  "active_streams": 23,
  "total_bitrate": 125000000,
  "cpu_usage": 45.2,
  "memory_usage": 3221225472,
  "uptime": 86400
}
Video Stats - GET /api/v1/streams/stats/video
{
  "streams": {
    "stream_123": {
      "frames_assembled": 180000,
      "frames_dropped": 10,
      "keyframes": 600,
      "p_frames": 120000,
      "b_frames": 59400,
      "gop_count": 600,
      "avg_gop_size": 300
    }
  }
}

Configuration

Complete Configuration Example
ingestion:
  # Connection limits
  max_connections: 25
  stream_timeout: 30s
  reconnect_interval: 5s
  
  # SRT configuration
  srt:
    enabled: true
    port: 30000
    latency: 120ms
    max_bandwidth: 60000000    # 60 Mbps
    payload_size: 1316         # MTU-friendly
    flow_window: 25600
    peer_idle_timeout: 30s
    
  # RTP configuration
  rtp:
    enabled: true
    port: 5004
    rtcp_port: 5005
    buffer_size: 2097152       # 2MB
    max_sessions: 30
    session_timeout: 30s
    
  # Buffer configuration
  buffer:
    ring_size: 4194304         # 4MB per stream
    pool_size: 30              # Pre-allocate for 30 streams
    write_timeout: 100ms
    read_timeout: 100ms
    
  # Memory management
  memory:
    global_limit: 8589934592   # 8GB total
    per_stream_limit: 419430400 # 400MB per stream
    gc_interval: 1m
    
  # Backpressure control
  backpressure:
    enabled: true
    high_watermark: 0.8        # 80% full
    low_watermark: 0.6         # 60% full
    drop_strategy: "oldest"    # or "lowest_priority"
    
  # Frame processing
  frame:
    assembly_timeout: 5s
    max_frame_size: 10485760   # 10MB
    reorder_buffer: 100        # packets
    
  # GOP management
  gop:
    buffer_count: 3            # Keep 3 GOPs
    max_gop_size: 300          # frames
    force_idr_interval: 10s    # Force IDR if missing
    
  # A/V synchronization
  sync:
    enabled: true
    threshold: 100ms           # Sync threshold
    correction_rate: 0.1       # 10% correction per second
    max_drift: 1s              # Max allowed drift

Best Practices

1. Stream Handling
// DO: Use context for cancellation
func (h *StreamHandler) Start(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case packet := <-h.input:
            if err := h.process(packet); err != nil {
                h.logger.WithError(err).Error("Processing failed")
                // Continue processing other packets
            }
        }
    }
}

// DON'T: Block on processing
func (h *StreamHandler) BadStart() error {
    for packet := range h.input {
        // This can't be cancelled!
        h.process(packet)
    }
}
2. Memory Management
// DO: Use buffer pools
func (h *Handler) ProcessFrame(data []byte) error {
    buf := h.bufferPool.Get()
    defer h.bufferPool.Put(buf)
    
    // Use buffer
    buf.Write(data)
    return h.pipeline.Process(buf)
}

// DON'T: Allocate per frame
func (h *Handler) BadProcessFrame(data []byte) error {
    // Allocates new buffer every time!
    buf := make([]byte, len(data))
    copy(buf, data)
    return h.pipeline.Process(buf)
}
3. Error Recovery
// DO: Implement circuit breaker
type StreamCircuitBreaker struct {
    failures    int
    maxFailures int
    resetAfter  time.Duration
    lastFailure time.Time
    state       State
}

func (cb *StreamCircuitBreaker) Call(fn func() error) error {
    if cb.state == Open {
        if time.Since(cb.lastFailure) > cb.resetAfter {
            cb.state = HalfOpen
        } else {
            return ErrCircuitOpen
        }
    }
    
    err := fn()
    if err != nil {
        cb.recordFailure()
    } else {
        cb.reset()
    }
    
    return err
}
4. Monitoring
// DO: Track comprehensive metrics
type StreamMetrics struct {
    // Counters
    packetsReceived  prometheus.Counter
    framesAssembled  prometheus.Counter
    framesDropped    prometheus.Counter
    
    // Gauges
    activeStreams    prometheus.Gauge
    bufferUsage      prometheus.Gauge
    
    // Histograms
    frameLatency     prometheus.Histogram
    processingTime   prometheus.Histogram
}

// Update metrics
func (m *StreamMetrics) RecordFrame(frame Frame, duration time.Duration) {
    m.framesAssembled.Inc()
    m.frameLatency.Observe(frame.Latency.Seconds())
    m.processingTime.Observe(duration.Seconds())
}

Testing

Unit Tests
func TestFrameAssembler(t *testing.T) {
    assembler := frame.NewAssembler(frame.Config{
        Timeout: 5 * time.Second,
        MaxSize: 1024 * 1024,
    })
    
    // Test single packet frame
    packet := &rtp.Packet{
        Header: rtp.Header{
            Marker:        true,
            SequenceNumber: 1,
        },
        Payload: []byte{0x01, 0x02, 0x03},
    }
    
    frame, complete := assembler.AddPacket(packet)
    assert.True(t, complete)
    assert.Equal(t, []byte{0x01, 0x02, 0x03}, frame.Data)
    
    // Test multi-packet frame
    packets := createMultiPacketFrame(t)
    var finalFrame *Frame
    
    for _, pkt := range packets {
        frame, complete = assembler.AddPacket(pkt)
        if complete {
            finalFrame = frame
            break
        }
    }
    
    assert.NotNil(t, finalFrame)
    assert.Equal(t, expectedData, finalFrame.Data)
}
Integration Tests
func TestSRTIngestion(t *testing.T) {
    // Start test SRT server
    manager := setupTestManager(t)
    defer manager.Stop()
    
    // Connect SRT client
    client, err := srt.Dial("srt://localhost:30000", srt.Config{
        StreamID: "test_stream",
        Latency:  120 * time.Millisecond,
    })
    require.NoError(t, err)
    defer client.Close()
    
    // Send test data
    testData := generateTestMPEGTS(t)
    _, err = client.Write(testData)
    require.NoError(t, err)
    
    // Wait for processing
    time.Sleep(100 * time.Millisecond)
    
    // Verify stream registered
    stream, err := manager.GetStream(context.Background(), "test_stream")
    require.NoError(t, err)
    assert.Equal(t, "active", stream.Status)
    assert.Equal(t, "h264", stream.VideoCodec)
}
Stress Tests
func TestIngestionStress(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping stress test")
    }
    
    manager := setupTestManager(t)
    defer manager.Stop()
    
    // Start multiple streams
    const numStreams = 25
    clients := make([]*srt.Client, numStreams)
    
    for i := 0; i < numStreams; i++ {
        client, err := srt.Dial("srt://localhost:30000", srt.Config{
            StreamID: fmt.Sprintf("stream_%d", i),
        })
        require.NoError(t, err)
        clients[i] = client
        
        // Start sending data
        go sendContinuousData(t, client, 50*1024*1024/8) // 50 Mbps
    }
    
    // Run for 1 minute
    time.Sleep(1 * time.Minute)
    
    // Check metrics
    stats := manager.GetStats(context.Background())
    assert.Equal(t, numStreams, stats.ActiveStreams)
    assert.Zero(t, stats.FailedStreams)
    
    // Clean up
    for _, client := range clients {
        client.Close()
    }
}

Examples

Complete Ingestion Setup
// main.go
func setupIngestion(cfg *config.Config, logger logger.Logger) (*ingestion.Manager, error) {
    // Create Redis client for registry
    redisClient := redis.NewClient(&redis.Options{
        Addr: cfg.Redis.Addr,
    })
    
    // Create ingestion manager
    manager := ingestion.NewManager(cfg.Ingestion, redisClient, logger)
    
    // Register health checks
    healthMgr.Register(&ingestion.HealthChecker{
        Manager: manager,
    })
    
    // Start ingestion
    if err := manager.Start(context.Background()); err != nil {
        return nil, fmt.Errorf("failed to start ingestion: %w", err)
    }
    
    // Setup metrics export
    go manager.ExportMetrics(metricsRegistry)
    
    return manager, nil
}
Custom Stream Handler
// Implement custom processing logic
type CustomStreamHandler struct {
    *ingestion.StreamHandler
    
    // Custom fields
    transcoder *ffmpeg.Transcoder
    analyzer   *video.Analyzer
}

func (h *CustomStreamHandler) ProcessFrame(frame *Frame) error {
    // Analyze frame
    analysis := h.analyzer.Analyze(frame)
    
    // Make transcoding decisions
    if analysis.SceneChange {
        h.transcoder.ForceKeyframe()
    }
    
    // Forward to base handler
    return h.StreamHandler.ProcessFrame(frame)
}

// Register custom handler
manager.RegisterHandlerFactory(func(config StreamConfig) StreamHandler {
    return &CustomStreamHandler{
        StreamHandler: ingestion.NewStreamHandler(config),
        transcoder:    ffmpeg.NewTranscoder(config),
        analyzer:      video.NewAnalyzer(),
    }
})

Documentation

Index

Constants

View Source
const (
	// DefaultMaxMemory is the default total memory limit (2.5GB)
	DefaultMaxMemory = int64(2684354560)
	// DefaultMaxPerStream is the default per-stream memory limit (200MB)
	DefaultMaxPerStream = int64(209715200)
	// DefaultBitrate is the default stream bitrate (50 Mbps)
	DefaultBitrate = int64(52428800)
)

Variables

This section is empty.

Functions

func DetectCodecFromRTPSession

func DetectCodecFromRTPSession(session interface {
	GetPayloadType() uint8
	GetMediaFormat() string
	GetEncodingName() string
	GetClockRate() uint32
},
) types.CodecType

DetectCodecFromRTPSession is a helper that uses session metadata

Types

type CodecDetector

type CodecDetector struct {
	// contains filtered or unexported fields
}

CodecDetector detects codec from various sources

func NewCodecDetector

func NewCodecDetector() *CodecDetector

NewCodecDetector creates a new codec detector

func (*CodecDetector) AddPayloadTypeMapping

func (d *CodecDetector) AddPayloadTypeMapping(payloadType uint8, codec types.CodecType)

AddPayloadTypeMapping adds a custom payload type to codec mapping

func (*CodecDetector) DetectFromRTPPacket

func (d *CodecDetector) DetectFromRTPPacket(packet *rtp.Packet) types.CodecType

DetectFromRTPPacket attempts to detect codec from RTP packet

func (*CodecDetector) DetectFromSDP

func (d *CodecDetector) DetectFromSDP(sdp string) (types.CodecType, map[string]string)

DetectFromSDP parses SDP and extracts codec information

type ConnectionStats

type ConnectionStats struct {
	PacketsLost      int64
	PacketsRetrans   int64
	RTTMs            float64
	BandwidthMbps    float64
	DeliveryDelayMs  float64
	ConnectionTimeMs int64
}

ConnectionStats contains connection-level statistics

type ConnectionStatsDTO

type ConnectionStatsDTO struct {
	PacketsLost      int64   `json:"packets_lost"`
	PacketsRetrans   int64   `json:"packets_retrans"`
	RTTMs            float64 `json:"rtt_ms"`
	BandwidthMbps    float64 `json:"bandwidth_mbps"`
	DeliveryDelayMs  float64 `json:"delivery_delay_ms"`
	ConnectionTimeMs int64   `json:"connection_time_ms"`
}

type DriftStatusDTO

type DriftStatusDTO struct {
	CurrentDrift  time.Duration `json:"current_drift"`
	MaxDrift      time.Duration `json:"max_drift"`
	MinDrift      time.Duration `json:"min_drift"`
	AvgDrift      time.Duration `json:"avg_drift"`
	SampleCount   int           `json:"sample_count"`
	LastCorrected time.Time     `json:"last_corrected,omitempty"`
}

DriftStatusDTO represents drift statistics

type ErrorResponse

type ErrorResponse struct {
	Error   string    `json:"error"`
	Message string    `json:"message"`
	Time    time.Time `json:"time"`
}

Error response

type FrameBufferStatsDTO

type FrameBufferStatsDTO struct {
	Capacity        int64   `json:"capacity"`         // Max frames in buffer
	Used            int64   `json:"used"`             // Current frames in buffer
	Available       int64   `json:"available"`        // Available frame slots
	FramesAssembled uint64  `json:"frames_assembled"` // Total frames assembled
	FramesDropped   uint64  `json:"frames_dropped"`   // Frames dropped due to pressure
	QueuePressure   float64 `json:"queue_pressure"`   // Current pressure (0-1)
	Keyframes       uint64  `json:"keyframes"`        // Number of keyframes
	PFrames         uint64  `json:"p_frames"`         // Number of P frames
	BFrames         uint64  `json:"b_frames"`         // Number of B frames
}

FrameBufferStatsDTO contains video-aware buffer statistics

type FrameData

type FrameData struct {
	Data      []byte `json:"data"`
	Timestamp int64  `json:"timestamp"`
}

FrameData contains individual frame information

type FramePreviewResponse

type FramePreviewResponse struct {
	StreamID   string      `json:"stream_id"`
	FrameCount int64       `json:"frame_count"`
	Frames     []FrameData `json:"frames"`
}

FramePreviewResponse contains frame preview data

type Handlers

type Handlers struct {
	// contains filtered or unexported fields
}

Handlers wraps the ingestion manager to provide HTTP handlers

func NewHandlers

func NewHandlers(manager *Manager, logger logger.Logger) *Handlers

NewHandlers creates a new handlers wrapper

func (*Handlers) RegisterRoutes

func (h *Handlers) RegisterRoutes(router *mux.Router)

RegisterRoutes registers all ingestion API routes

type IngestionStats

type IngestionStats struct {
	Started        bool `json:"started"`
	SRTEnabled     bool `json:"srt_enabled"`
	RTPEnabled     bool `json:"rtp_enabled"`
	SRTSessions    int  `json:"srt_sessions"`
	RTPSessions    int  `json:"rtp_sessions"`
	TotalStreams   int  `json:"total_streams"`
	ActiveHandlers int  `json:"active_handlers"` // Number of active stream handlers
}

IngestionStats holds ingestion statistics

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates all ingestion components

func NewManager

func NewManager(cfg *config.IngestionConfig, logger logger.Logger) (*Manager, error)

NewManager creates a new ingestion manager

func (*Manager) CreateStreamHandler

func (m *Manager) CreateStreamHandler(streamID string, conn StreamConnection) (*StreamHandler, error)

CreateStreamHandler creates a new stream handler with backpressure support

func (*Manager) GetActiveStreams

func (m *Manager) GetActiveStreams(ctx context.Context) ([]*registry.Stream, error)

GetActiveStreams returns all active streams from the registry

func (*Manager) GetRegistry

func (m *Manager) GetRegistry() registry.Registry

GetRegistry returns the stream registry

func (*Manager) GetStats

func (m *Manager) GetStats(ctx context.Context) IngestionStats

GetStats returns ingestion statistics

func (*Manager) GetStream

func (m *Manager) GetStream(ctx context.Context, streamID string) (*registry.Stream, error)

GetStream returns a specific stream by ID

func (*Manager) GetStreamHandler

func (m *Manager) GetStreamHandler(streamID string) (*StreamHandler, bool)

GetStreamHandler returns a stream handler by ID

func (*Manager) GetStreamHandlerAndStats

func (m *Manager) GetStreamHandlerAndStats(streamID string) (*StreamHandler, StreamStats, bool)

GetStreamHandlerAndStats atomically gets both handler and stats to prevent race conditions

func (*Manager) GetStreamHandlerWithStats

func (m *Manager) GetStreamHandlerWithStats(streamID string) (StreamStats, bool)

GetStreamHandlerWithStats atomically gets handler stats to prevent race conditions

func (*Manager) GetStreamMetrics

func (m *Manager) GetStreamMetrics(streamID string) (*StreamMetrics, error)

GetStreamMetrics returns metrics for a specific stream

func (*Manager) HandleDeleteStream

func (m *Manager) HandleDeleteStream(w http.ResponseWriter, r *http.Request)

HandleDeleteStream - DELETE /api/v1/streams/{id}

func (*Manager) HandleGetStream

func (m *Manager) HandleGetStream(w http.ResponseWriter, r *http.Request)

HandleGetStream - GET /api/v1/streams/{id}

func (*Manager) HandleListStreams

func (m *Manager) HandleListStreams(w http.ResponseWriter, r *http.Request)

HandleListStreams - GET /api/v1/streams

func (*Manager) HandlePauseStream

func (m *Manager) HandlePauseStream(w http.ResponseWriter, r *http.Request)

HandlePauseStream - POST /api/v1/streams/{id}/pause

func (*Manager) HandleRTPSession

func (m *Manager) HandleRTPSession(session *rtp.Session) error

HandleRTPSession handles a new RTP session with proper backpressure

func (*Manager) HandleRTPStreamWithBackpressure

func (m *Manager) HandleRTPStreamWithBackpressure(session *rtp.Session) error

HandleRTPStreamWithBackpressure integrates RTP session with backpressure handling

func (*Manager) HandleResumeStream

func (m *Manager) HandleResumeStream(w http.ResponseWriter, r *http.Request)

HandleResumeStream - POST /api/v1/streams/{id}/resume

func (*Manager) HandleSRTConnection

func (m *Manager) HandleSRTConnection(conn *srt.Connection) error

HandleSRTConnection handles a new SRT connection with proper backpressure

func (*Manager) HandleSRTStreamWithBackpressure

func (m *Manager) HandleSRTStreamWithBackpressure(conn *srt.Connection) error

HandleSRTStreamWithBackpressure integrates SRT connection with backpressure handling

func (*Manager) HandleStats

func (m *Manager) HandleStats(w http.ResponseWriter, r *http.Request)

HandleStats - GET /api/v1/stats

func (*Manager) HandleStreamBackpressure

func (m *Manager) HandleStreamBackpressure(w http.ResponseWriter, r *http.Request)

HandleStreamBackpressure - GET /api/v1/streams/{id}/backpressure Returns backpressure statistics and current state

func (*Manager) HandleStreamBackpressureControl

func (m *Manager) HandleStreamBackpressureControl(w http.ResponseWriter, r *http.Request)

HandleStreamBackpressureControl - POST /api/v1/streams/{id}/backpressure/control Allows manual control of backpressure settings

func (*Manager) HandleStreamBuffer

func (m *Manager) HandleStreamBuffer(w http.ResponseWriter, r *http.Request)

HandleStreamBuffer - GET /api/v1/streams/{id}/buffer Returns frame buffer statistics

func (*Manager) HandleStreamData

func (m *Manager) HandleStreamData(w http.ResponseWriter, r *http.Request)

HandleStreamData - GET /api/v1/streams/{id}/data Returns a stream of frame data (video-aware)

func (*Manager) HandleStreamIframe

func (m *Manager) HandleStreamIframe(w http.ResponseWriter, r *http.Request)

HandleStreamIframe - GET /api/v1/streams/{id}/iframe Returns the latest iframe as JPEG

func (*Manager) HandleStreamParameters

func (m *Manager) HandleStreamParameters(w http.ResponseWriter, r *http.Request)

HandleStreamParameters handles GET /api/v1/streams/{id}/parameters - Production monitoring endpoint

func (*Manager) HandleStreamPreview

func (m *Manager) HandleStreamPreview(w http.ResponseWriter, r *http.Request)

HandleStreamPreview - GET /api/v1/streams/{id}/preview Returns a preview of recent frames

func (*Manager) HandleStreamRecovery

func (m *Manager) HandleStreamRecovery(w http.ResponseWriter, r *http.Request)

HandleStreamRecovery - GET /api/v1/streams/{id}/recovery Returns error recovery statistics and current state

func (*Manager) HandleStreamStats

func (m *Manager) HandleStreamStats(w http.ResponseWriter, r *http.Request)

HandleStreamStats - GET /api/v1/streams/{id}/stats

func (*Manager) HandleStreamSync

func (m *Manager) HandleStreamSync(w http.ResponseWriter, r *http.Request)

HandleStreamSync - GET /api/v1/streams/{id}/sync Returns A/V synchronization status for a stream

func (*Manager) HandleVideoStats

func (m *Manager) HandleVideoStats(w http.ResponseWriter, r *http.Request)

HandleVideoStats - GET /api/v1/streams/stats/video

func (*Manager) MonitorSystemHealth

func (m *Manager) MonitorSystemHealth(ctx context.Context)

MonitorSystemHealth monitors overall system health and applies global backpressure if needed

func (*Manager) PauseStream

func (m *Manager) PauseStream(ctx context.Context, streamID string) error

PauseStream pauses data ingestion for a stream

func (*Manager) RemoveStreamHandler

func (m *Manager) RemoveStreamHandler(streamID string)

RemoveStreamHandler removes a stream handler

func (*Manager) ResumeStream

func (m *Manager) ResumeStream(ctx context.Context, streamID string) error

ResumeStream resumes data ingestion for a paused stream

func (*Manager) Start

func (m *Manager) Start() error

Start starts all ingestion components

func (*Manager) Stop

func (m *Manager) Stop() error

Stop stops all ingestion components

func (*Manager) TerminateStream

func (m *Manager) TerminateStream(ctx context.Context, streamID string) error

TerminateStream terminates a stream and removes it from the registry

type RTPConnection

type RTPConnection interface {
	StreamConnection
	GetBitrate() int64
	GetSSRC() uint32
	SendRTCP(rtcp.Packet) error
}

RTPConnection wraps rtp.Session to implement StreamConnection

type RTPConnectionAdapter

type RTPConnectionAdapter struct {
	*rtpPkg.Session
	// contains filtered or unexported fields
}

RTPConnectionAdapter adapts rtp.Session to implement StreamConnection and RTPConnection interfaces Now also emits TimestampedPackets for video-aware processing

func NewRTPConnectionAdapter

func NewRTPConnectionAdapter(session *rtpPkg.Session, codecType types.CodecType) *RTPConnectionAdapter

NewRTPConnectionAdapter creates a new adapter

func (*RTPConnectionAdapter) Close

func (a *RTPConnectionAdapter) Close() error

Close implements StreamConnection

func (*RTPConnectionAdapter) GetAudioOutput

func (a *RTPConnectionAdapter) GetAudioOutput() <-chan types.TimestampedPacket

GetAudioOutput returns the channel of audio TimestampedPackets

func (*RTPConnectionAdapter) GetBitrate

func (a *RTPConnectionAdapter) GetBitrate() int64

GetBitrate returns the current bitrate

func (*RTPConnectionAdapter) GetSSRC

func (a *RTPConnectionAdapter) GetSSRC() uint32

GetSSRC returns the SSRC of the stream

func (*RTPConnectionAdapter) GetStreamID

func (a *RTPConnectionAdapter) GetStreamID() string

GetStreamID implements StreamConnection

func (*RTPConnectionAdapter) GetVideoOutput

func (a *RTPConnectionAdapter) GetVideoOutput() <-chan types.TimestampedPacket

GetVideoOutput returns the channel of video TimestampedPackets

func (*RTPConnectionAdapter) ProcessPacket

func (a *RTPConnectionAdapter) ProcessPacket(rtpPkt *rtp.Packet) error

ProcessPacket converts an RTP packet to TimestampedPacket This should be called by the Session when it receives packets

func (*RTPConnectionAdapter) Read

func (a *RTPConnectionAdapter) Read(buf []byte) (int, error)

Read implements StreamConnection

func (*RTPConnectionAdapter) SendRTCP

func (a *RTPConnectionAdapter) SendRTCP(pkt rtcp.Packet) error

SendRTCP sends an RTCP packet

func (*RTPConnectionAdapter) SetDepacketizer

func (a *RTPConnectionAdapter) SetDepacketizer(depacketizer codec.Depacketizer)

SetDepacketizer sets the depacketizer for frame analysis

type SRTConnection

type SRTConnection interface {
	StreamConnection
	GetMaxBW() int64
	SetMaxBW(int64) error
}

SRTConnection wraps srt.Connection to implement StreamConnection

type SRTConnectionAdapter

type SRTConnectionAdapter struct {
	*srt.Connection
	// contains filtered or unexported fields
}

SRTConnectionAdapter adapts srt.Connection to implement StreamConnection and SRTConnection interfaces Now also parses MPEG-TS and emits TimestampedPackets with parameter set extraction

func NewSRTConnectionAdapter

func NewSRTConnectionAdapter(conn *srt.Connection) *SRTConnectionAdapter

NewSRTConnectionAdapter creates a new adapter

func (*SRTConnectionAdapter) Close

func (a *SRTConnectionAdapter) Close() error

Close implements StreamConnection

func (*SRTConnectionAdapter) GetAudioOutput

func (a *SRTConnectionAdapter) GetAudioOutput() <-chan types.TimestampedPacket

GetAudioOutput returns the channel of audio TimestampedPackets

func (*SRTConnectionAdapter) GetDetectedVideoCodec

func (a *SRTConnectionAdapter) GetDetectedVideoCodec() types.CodecType

GetDetectedVideoCodec returns the codec detected from MPEG-TS PMT

func (*SRTConnectionAdapter) GetMaxBW

func (a *SRTConnectionAdapter) GetMaxBW() int64

GetMaxBW returns the current max bandwidth setting

func (*SRTConnectionAdapter) GetParameterSetCache

func (a *SRTConnectionAdapter) GetParameterSetCache() *types.ParameterSetContext

GetParameterSetCache returns the parameter set cache for external access

func (*SRTConnectionAdapter) GetStreamID

func (a *SRTConnectionAdapter) GetStreamID() string

GetStreamID implements StreamConnection

func (*SRTConnectionAdapter) GetVideoOutput

func (a *SRTConnectionAdapter) GetVideoOutput() <-chan types.TimestampedPacket

GetVideoOutput returns the channel of video TimestampedPackets

func (*SRTConnectionAdapter) Read

func (a *SRTConnectionAdapter) Read(buf []byte) (int, error)

Read implements StreamConnection

func (*SRTConnectionAdapter) SetMaxBW

func (a *SRTConnectionAdapter) SetMaxBW(bw int64) error

SetMaxBW sets the max bandwidth for backpressure

func (*SRTConnectionAdapter) SetPIDs

func (a *SRTConnectionAdapter) SetPIDs(videoPID, audioPID uint16)

SetPIDs sets the PIDs for video and audio streams

type StreamConnection

type StreamConnection interface {
	GetStreamID() string
	Read([]byte) (int, error)
	Close() error
}

StreamConnection is a common interface for SRT and RTP connections

type StreamDTO

type StreamDTO struct {
	ID            string         `json:"id"`
	Type          string         `json:"type"`
	SourceAddr    string         `json:"source_addr"`
	Status        string         `json:"status"`
	CreatedAt     time.Time      `json:"created_at"`
	LastHeartbeat time.Time      `json:"last_heartbeat"`
	VideoCodec    string         `json:"video_codec"`
	Resolution    string         `json:"resolution"`
	Bitrate       int64          `json:"bitrate"`
	FrameRate     float64        `json:"frame_rate"`
	Stats         StreamStatsDTO `json:"stats"`
}

type StreamHandler

type StreamHandler struct {
	// contains filtered or unexported fields
}

StreamHandler handles a single stream with full video awareness

func NewStreamHandler

func NewStreamHandler(
	ctx context.Context,
	streamID string,
	conn StreamConnection,
	queue *queue.HybridQueue,
	memController *memory.Controller,
	logger logger.Logger,
) *StreamHandler

NewStreamHandler creates a new unified video-aware stream handler

func (*StreamHandler) GetDetectedResolution

func (h *StreamHandler) GetDetectedResolution() resolution.Resolution

GetDetectedResolution returns the detected resolution

func (*StreamHandler) GetFramePreview

func (h *StreamHandler) GetFramePreview(durationSeconds float64) ([]byte, int)

GetFramePreview returns a preview of recent frames

func (*StreamHandler) GetLatestIFrameWithSessionContext

func (h *StreamHandler) GetLatestIFrameWithSessionContext() (*types.VideoFrame, *types.ParameterSetContext)

GetLatestIFrameWithSessionContext returns the latest iframe with session-long parameter context This replaces the GOP buffer approach with session-scoped parameter management

func (*StreamHandler) GetSessionParameterCache

func (h *StreamHandler) GetSessionParameterCache() *types.ParameterSetContext

GetSessionParameterCache returns the session parameter cache for external access

func (*StreamHandler) GetStats

func (h *StreamHandler) GetStats() StreamStats

GetStats returns current statistics

func (*StreamHandler) GetSyncManager

func (h *StreamHandler) GetSyncManager() *isync.Manager

GetSyncManager returns the sync manager for this stream

func (*StreamHandler) RecoverFromError

func (h *StreamHandler) RecoverFromError() error

RecoverFromError attempts to recover from an error using GOP buffer

func (*StreamHandler) SeekToKeyframe

func (h *StreamHandler) SeekToKeyframe(targetPTS int64) (*types.VideoFrame, error)

SeekToKeyframe seeks to the nearest keyframe before the given timestamp

func (*StreamHandler) Start

func (h *StreamHandler) Start()

Start begins processing the stream with video awareness

func (*StreamHandler) Stop

func (h *StreamHandler) Stop() error

Stop stops the stream handler

type StreamListResponse

type StreamListResponse struct {
	Streams []StreamDTO `json:"streams"`
	Count   int         `json:"count"`
	Time    time.Time   `json:"time"`
}

API Response DTOs

type StreamMetrics

type StreamMetrics struct {
	StreamID        string  `json:"stream_id"`
	Type            string  `json:"type"`
	Status          string  `json:"status"`
	Bitrate         int64   `json:"bitrate"`
	FramesAssembled uint64  `json:"frames_assembled"`
	FramesDropped   uint64  `json:"frames_dropped"`
	QueueDepth      int64   `json:"queue_depth"`
	QueuePressure   float64 `json:"queue_pressure"`
	MemoryUsage     int64   `json:"memory_usage"`
	MemoryPressure  float64 `json:"memory_pressure"`
	CreatedAt       int64   `json:"created_at"`
	UpdatedAt       int64   `json:"updated_at"`
}

StreamMetrics contains detailed metrics for a stream

type StreamStats

type StreamStats struct {
	StreamID        string
	Codec           string
	PacketsReceived uint64
	FramesAssembled uint64
	FramesDropped   uint64
	BytesProcessed  uint64
	Errors          uint64
	Bitrate         float64 // Bits per second
	QueueDepth      int64
	QueuePressure   float64
	Backpressure    bool
	Started         bool
	PipelineStats   pipeline.PipelineStats
	// Connection-level statistics
	ConnectionStats *ConnectionStats
	// Frame type breakdown
	KeyframeCount uint64
	PFrameCount   uint64
	BFrameCount   uint64
	// Video information
	Resolution resolution.Resolution
	Framerate  float64
	// GOP statistics
	GOPStats gop.GOPStatistics
	// GOP buffer statistics
	GOPBufferStats gop.BufferStatistics
	// Backpressure statistics
	BackpressureStats backpressure.Statistics
	// Recovery statistics
	RecoveryStats recovery.Statistics
}

StreamStats contains unified statistics

type StreamStatsDTO

type StreamStatsDTO struct {
	BytesReceived    int64               `json:"bytes_received"`
	PacketsReceived  int64               `json:"packets_received"`
	PacketsLost      int64               `json:"packets_lost"`
	Bitrate          int64               `json:"bitrate"`
	FrameBufferStats FrameBufferStatsDTO `json:"frame_buffer_stats"`
	ConnectionStats  *ConnectionStatsDTO `json:"connection_stats,omitempty"`
}

type SuccessResponse

type SuccessResponse struct {
	Message   string      `json:"message"`
	Data      interface{} `json:"data,omitempty"`
	Timestamp time.Time   `json:"timestamp"`
}

SuccessResponse for simple success messages

type SyncStatusResponse

type SyncStatusResponse struct {
	StreamID string              `json:"stream_id"`
	Status   sync.SyncStatusType `json:"status"`
	Video    *TrackStatusDTO     `json:"video,omitempty"`
	Audio    *TrackStatusDTO     `json:"audio,omitempty"`
	Drift    *DriftStatusDTO     `json:"drift,omitempty"`
	Time     time.Time           `json:"time"`
}

SyncStatusResponse represents the sync status for a stream

type TrackStatusDTO

type TrackStatusDTO struct {
	Type         string    `json:"type"`
	LastPTS      int64     `json:"last_pts"`
	LastDTS      int64     `json:"last_dts"`
	BaseTime     time.Time `json:"base_time"`
	PacketCount  int64     `json:"packet_count"`
	DroppedCount int64     `json:"dropped_count"`
	WrapCount    int       `json:"wrap_count"`
	JumpCount    int       `json:"jump_count"`
	ErrorCount   int       `json:"error_count"`
}

TrackStatusDTO represents the status of a single track

type VideoAwareConnection

type VideoAwareConnection interface {
	StreamConnection
	GetVideoOutput() <-chan types.TimestampedPacket
	GetAudioOutput() <-chan types.TimestampedPacket
}

VideoAwareConnection is an interface for connections that provide video/audio channels

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL