ekanite

package module
v0.0.0-...-2f2e688 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2015 License: MIT Imports: 19 Imported by: 0

README

For detailed look at the goals, design, and implementation of this project, check out this blog post.

Ekanite Circle CI GoDoc

Ekanite is a syslog server with built-in search. Its goal is to do one thing, and do it well -- make log messages received over the network searchable. What it lacks in feature, it makes up for in focus. Built in Go, it has no external dependencies, which makes deployment easy.

Features include:

  • Supports reception of log messages over UDP, TCP, and TCP with TLS.
  • Full text search of all received log messages.
  • Full parsing of RFC5424 headers.
  • Log messages are indexed by parsed timestamp, if one is available. This means search results are presented in the order the messages occurred, not in the order they were received, ensuring sensible display even with delayed senders.
  • Automatic data-retention management. Ekanite deletes indexed log data older than a configurable time period.
  • Not a JVM in sight.

Search is implemented using the bleve search library. For some performance analysis of bleve, and of the sharding techniques used by Ekanite, check out this post.

Building

Tested on 64-bit Kubuntu 14.04.

mkdir ~/ekanite # Or a directory of your choice.
cd ~/ekanite
export GOPATH=$PWD
go get github.com/ekanite/ekanite
go install github.com/ekanite/...

Running

The daemon will be located in the $GOPATH/bin directory. Execute

$ ekanited -h
    ekanite [options]
    -batchsize=300: Indexing batch size.
    -batchtime=1000: Indexing batch timeout, in milliseconds.
    -datadir="/var/opt/ekanite": Set data directory.
    -diag="": expvar and pprof bind address in the form host:port. If not set, not started.
    -maxpending=1000: Maximum pending index events.
    -noreport=false: Do not report anonymous data on launch.
    -numshards=16: Set number of shards per index.
    -query="localhost:9950": TCP Bind address for query server in the form host:port.
    -retention="168h": Data retention period. Minimum is 24 hours.
    -tcp="": Syslog server TCP bind address in the form host:port. If not set, not started.
    -udp="": Syslog server UDP bind address in the form host:port. If not set, not started.

for command-line options.

Sending logs to Ekanite

For now, for Ekanite to accept logs, your syslog client must be configured such that the log lines are RFC5424 compliant, and in the following format:

<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROC-ID MSGID MSG"

Consult the RFC to learn what each of these fields is. The TIMESTAMP field must be in RFC3339 format. Both rsyslog and syslog-ng support templating, which make it very easy for those programs to format logs correctly and transmit the logs to Ekanite. Templates and installation instructions for both systems are below.

rsyslog

# Send messages to Ekanite over TCP using the template. Assumes Ekanite is listening on 127.0.0.1:5514
$template Ekanite,"<%pri%>%protocol-version% %timestamp:::date-rfc3339% %HOSTNAME% %app-name% %procid% - %msg%"
*.*             @@127.0.0.1:5514;EkaniteFormat

Add this template to /etc/rsyslog.d/23-ekanite.conf and then restart rsyslog using the command sudo service rsyslog restart.

syslog-ng

source s_ekanite {
	system();	# Check which OS & collect system logs
	internal();	# Collect syslog-ng logs
};
template Ekanite { template("<${PRI}>1 ${ISODATE} ${HOST} ${PROGRAM} ${PID} - $MSG"); template_escape(no) };
destination d_ekanite {
	tcp("127.0.0.1" port(5514) template(Ekanite));
};

log { 
	source(s_ekanite); 
	destination(d_ekanite); 
};

Add this template to /etc/syslog-ng/syslog-ng.conf and then restart syslog-ng using the command /etc/init.d/syslog-ng restart.

With these changes in place rsyslog or syslog-ng will continue to send logs to any existing destination, and also forward the logs to Ekanite.

Searching the logs

Search support is pretty simple at the moment. Telnet to the query server (see the command line options) and enter a search term. The query language supported is the simple language supported by bleve, but a more sophisiticated query syntax, including searching for specific field values, may be supported soon.

For example, below is an example search session, showing accesses to the login URL of a Wordpress site. The telnet clients connects to the query server and enters the string login

$ telnet 127.0.0.1 9950
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
login
<134>0 2015-05-05T23:50:17.025568+00:00 fisher apache-access - - 65.98.59.154 - - [05/May/2015:23:50:12 +0000] "GET /wp-login.php HTTP/1.0" 200 206 "-" "-"
<134>0 2015-05-06T01:24:41.232890+00:00 fisher apache-access - - 104.140.83.221 - - [06/May/2015:01:24:40 +0000] "GET /wp-login.php?action=register HTTP/1.0" 200 206 "http://www.philipotoole.com/" "Opera/9.80 (Windows NT 6.2; Win64; x64) Presto/2.12.388 Version/12.17"
<134>0 2015-05-06T01:24:41.232895+00:00 fisher apache-access - - 104.140.83.221 - - [06/May/2015:01:24:40 +0000] "GET /wp-login.php?action=register HTTP/1.1" 200 243 "http://www.philipotoole.com/wp-login.php?action=register" "Opera/9.80 (Windows NT 6.2; Win64; x64) Presto/2.12.388 Version/12.17"
<134>0 2015-05-06T02:47:54.612953+00:00 fisher apache-access - - 184.68.20.22 - - [06/May/2015:02:47:51 +0000] "GET /wp-login.php HTTP/1.1" 200 243 "-" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/24.0.1309.0 Safari/537.17"
<134>0 2015-05-06T04:20:49.008609+00:00 fisher apache-access - - 193.104.41.186 - - [06/May/2015:04:20:46 +0000] "POST /wp-login.php HTTP/1.1" 200 206 "-" "Opera 10.00"

Perhaps you only want to search for POST accesses to that URL:

login -GET
<134>0 2015-05-06T04:20:49.008609+00:00 fisher apache-access - - 193.104.41.186 - - [06/May/2015:04:20:46 +0000] "POST /wp-login.php HTTP/1.1" 200 206 "-" "Opera 10.00"

A more sophisticated client program is planned.

Diagnostics

Basic statistics and diagnostics are available. Visit http://localhost:9951/debug/vars to retrieve this information. The host and port can be changed via the -diag command-line option.

Reporting

Ekanite reports a small amount anonymous data to Loggly, each time it is launched. This data is just the host operating system and system architecture and is only used to track the number of Ekanite deployments. Reporting can be disabled by passing -noreport=true to Ekanite at launch time.

Project Status

The project is actively developed and is early stage software -- contributions in the form of bug reports and pull requests are welcome. Much work remains around performance and scaling.

Documentation

Index

Constants

View Source
const (
	DefaultNumShards       = 16
	DefaultIndexDuration   = 24 * time.Hour
	DefaultRetentionPeriod = 24 * time.Hour

	RetentionCheckInterval = time.Hour
)

Variables

This section is empty.

Functions

func DeleteIndex

func DeleteIndex(i *Index) error

DeleteIndex deletes the index.

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher accepts "input events", and once it has a certain number, or a certain amount of time has passed, sends those as indexable Events to an Indexer. It also supports a maximum number of unprocessed Events it will keep pending. Once this limit is reached, it will not accept anymore until outstanding Events are processed.

func NewBatcher

func NewBatcher(e EventIndexer, sz int, dur time.Duration, max int) *Batcher

NewBatcher returns a Batcher for EventIndexer e, a batching size of sz, a maximum duration of dur, and a maximum outstanding count of max.

func (*Batcher) C

func (b *Batcher) C() chan<- *input.Event

C returns the channel on the batcher to which events should be sent.

func (*Batcher) Start

func (b *Batcher) Start(errChan chan<- error) error

Start starts the batching process.

type DocID

type DocID string

DocID is a string, with the following configuration. It's 32-characters long, encoding 2 64-bit unsigned integers. When sorting DocIDs, the first 16 characters, reading from the left hand side represent the most significant 64-bit number. And therefore the next 16 characters represent the least-significant 64-bit number.

type DocIDs

type DocIDs []DocID

func (DocIDs) Len

func (a DocIDs) Len() int

func (DocIDs) Less

func (a DocIDs) Less(i, j int) bool

func (DocIDs) Swap

func (a DocIDs) Swap(i, j int)

type Document

type Document interface {
	ID() DocID
	Data() interface{}
	Source() []byte
}

Document specifies the interface required by an object if it is to be indexed.

type Engine

type Engine struct {
	NumShards       int           // Number of shards to use when creating an index.
	IndexDuration   time.Duration // Duration of created indexes.
	RetentionPeriod time.Duration // How long after Index end-time to hang onto data.

	Logger *log.Logger
	// contains filtered or unexported fields
}

Engine is the component that performs all indexing.

func NewEngine

func NewEngine(path string) *Engine

NewEngine returns a new indexing engine, which will use any data located at path.

func (*Engine) Close

func (e *Engine) Close() error

Close closes the engine.

func (*Engine) Index

func (e *Engine) Index(events []*Event) error

Index indexes a batch of Events. It blocks until all processing has completed.

func (*Engine) Open

func (e *Engine) Open() error

Open opens the engine.

func (*Engine) Path

func (e *Engine) Path() string

Path returns the path to the indexed data directory.

func (*Engine) Search

func (e *Engine) Search(query string) (<-chan string, error)

Search performs a search.

func (*Engine) Total

func (e *Engine) Total() (uint64, error)

Total returns the total number of documents indexed.

type Event

type Event struct {
	*input.Event
}

Event is a log message that can be indexed.

func NewEvent

func NewEvent() *Event

NewEvent retuns a new Event.

func (Event) Data

func (e Event) Data() interface{}

Data returns the indexable data.

func (Event) ID

func (e Event) ID() DocID

ID returns a unique ID for the event.

func (Event) Source

func (e Event) Source() []byte

Source returns the original received data.

type EventIndexer

type EventIndexer interface {
	Index(events []*Event) error
}

type Index

type Index struct {
	Shards []*Shard         // Individual bleve indexes
	Alias  bleve.IndexAlias // All bleve indexes as one reference, for search
	// contains filtered or unexported fields
}

Index represents a collection of shards. It contains data for a specific time range.

func NewIndex

func NewIndex(path string, startTime, endTime time.Time, numShards int) (*Index, error)

NewIndex returns an Index for the given start and end time, with the requested shards. It returns an error if an index already exists at the path.

func OpenIndex

func OpenIndex(path string) (*Index, error)

OpenIndex opens an existing index, at the given path.

func (*Index) Close

func (i *Index) Close() error

Close closes the index.

func (*Index) Contains

func (i *Index) Contains(t time.Time) bool

Contains returns whether the index's time range includes the given reference time.

func (*Index) Document

func (i *Index) Document(id DocID) ([]byte, error)

Document returns the source from the index for the given ID.

func (*Index) EndTime

func (i *Index) EndTime() time.Time

EndTime returns the exclusive end time of the index.

func (*Index) Expired

func (i *Index) Expired(t time.Time, r time.Duration) bool

Expired returns whether the index has expired at the given time, if the retention period is r.

func (*Index) Index

func (i *Index) Index(documents []Document) error

Index indexes the slice of documents in the index. It takes care of all shard routing.

func (*Index) Path

func (i *Index) Path() string

Path returns the path to storage for the index.

func (*Index) Search

func (i *Index) Search(q string) (DocIDs, error)

Search performs a search of the index using the given query. Returns IDs of documents which satisfy all queries. Returns Doc IDs in sorted order, ascending.

func (*Index) Shard

func (i *Index) Shard(docId DocID) *Shard

Shard returns the shard from the index, for the given doc ID.

func (*Index) StartTime

func (i *Index) StartTime() time.Time

StartTime returns the inclusive start time of the index.

func (*Index) Total

func (i *Index) Total() (uint64, error)

Total returns the number of documents in the index.

type Indexes

type Indexes []*Index

func (Indexes) Len

func (i Indexes) Len() int

Indexes are ordered by decreasing end time. If two indexes have the same end time, then order by decreasing start time. This means that the first index in the slice covers the latest time range.

func (Indexes) Less

func (i Indexes) Less(u, v int) bool

func (Indexes) Swap

func (i Indexes) Swap(u, v int)

type Searcher

type Searcher interface {
	Search(query string) (<-chan string, error)
}

type Server

type Server struct {
	Logger *log.Logger
	// contains filtered or unexported fields
}

Server serves query client connections.

func NewServer

func NewServer(iface string, searcher Searcher) *Server

NewServer returns a new Server instance.

func (*Server) Addr

func (s *Server) Addr() net.Addr

Addr returns the address to which the Server is bound.

func (*Server) Start

func (s *Server) Start() error

Start instructs the Server to bind to the interface and accept connections.

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard is a the basic data store for indexed data. Indexing operations are not goroutine safe, and only 1 indexing operation should occur at one time.

func NewShard

func NewShard(path string) *Shard

NewShard returns a shard using the data at the given path.

func (*Shard) Close

func (s *Shard) Close() error

Close closes the shard.

func (*Shard) Document

func (s *Shard) Document(id DocID) ([]byte, error)

Document returns the source from the shard for the given ID.

func (*Shard) Index

func (s *Shard) Index(documents []Document) error

Index indexes a slice of Documents in the shard.

func (*Shard) Open

func (s *Shard) Open() error

Opens the shard. If no data exists at the shard's path, an empty shard will be created.

func (*Shard) Total

func (s *Shard) Total() (uint64, error)

Total returns the number of events in the shard.

Directories

Path Synopsis
cmd
ekanited command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL