pod-volume-cleaner/pkg/monitor/monitor.go

82 lines
1.5 KiB
Go
Raw Normal View History

package monitor
import (
"fmt"
"github.com/nxadm/tail"
"go.uber.org/zap"
"regexp"
)
type Option func(monitor *LogMonitor)
// WithTailFollow Continue looking for new lines (tail -f)
func WithTailFollow(follow bool) Option {
return func(monitor *LogMonitor) {
monitor.follow = follow
}
}
// WithReOpen Reopen recreated files (tail -F)
func WithReOpen(reopen bool) Option {
return func(monitor *LogMonitor) {
monitor.reopen = reopen
}
}
type Monitor interface {
Close() error
Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error
}
func New(opts ...Option) *LogMonitor {
l := &LogMonitor{
follow: true,
reopen: true,
cancel: make(chan interface{}),
}
for _, opt := range opts {
opt(l)
}
return l
}
type LogMonitor struct {
follow bool
reopen bool
cancel chan interface{}
}
func (l *LogMonitor) Close() error {
close(l.cancel)
return nil
}
func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error {
t, err := tail.TailFile(filename, tail.Config{Follow: l.follow, ReOpen: l.reopen, Logger: tail.DiscardingLogger})
if err != nil {
return fmt.Errorf("unable to begin to tail file '%s': %w", filename, err)
}
for {
select {
case line := <-t.Lines:
if line == nil {
// EOF
t.Cleanup()
return nil
}
if line.Err != nil {
zap.S().Errorf("tail error: %v", err)
continue
}
if filter.MatchString(line.Text) {
lineChan <- line.Text
}
case <-l.cancel:
t.Stop()
t.Cleanup()
return nil
}
}
}