feat: implement and test tailer file component

This commit is contained in:
2022-09-07 18:32:08 +02:00
parent 25a0b90803
commit 95f86d69f4
439 changed files with 190322 additions and 0 deletions

54
pkg/monitor/monitor.go Normal file
View File

@ -0,0 +1,54 @@
package monitor
import (
"fmt"
"github.com/nxadm/tail"
"go.uber.org/zap"
"regexp"
)
type Monitor interface {
Close() error
Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error
}
func New() *LogMonitor {
return &LogMonitor{
cancel: make(chan interface{}),
}
}
type LogMonitor struct {
cancel chan interface{}
}
func (l *LogMonitor) Close() error {
close(l.cancel)
return nil
}
func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error {
t, err := tail.TailFile(filename, tail.Config{Follow: true, ReOpen: true, Logger: tail.DiscardingLogger})
if err != nil {
return fmt.Errorf("unable to begin to tail file '%s': %w", filename, err)
}
for {
select {
case line := <-t.Lines:
if line.Err != nil {
zap.S().Errorf("tail error: %v", err)
continue
}
zap.S().Infof("new line: '%s'", line.Text)
if filter.MatchString(line.Text) {
lineChan <- line.Text
}
case <-l.cancel:
t.Stop()
t.Cleanup()
return nil
}
}
return nil
}

113
pkg/monitor/monitor_test.go Normal file
View File

@ -0,0 +1,113 @@
package monitor
import (
"os"
"path"
"reflect"
"regexp"
"sync"
"testing"
"time"
)
func TestLogMonitor_Watch(t *testing.T) {
type args struct {
filter *regexp.Regexp
}
type data struct {
lines []string
}
tests := []struct {
name string
args args
data data
want []string
}{
{
name: "watch all lines",
args: args{
regexp.MustCompile(".*"),
},
data: data{lines: []string{"line1", "line2"}},
want: []string{"line1", "line2"},
},
{
name: "match none lines",
args: args{
regexp.MustCompile(".*with unknown pattern.*"),
},
data: data{lines: []string{"line1", "line2"}},
want: []string{},
},
{
name: "match one line",
args: args{
regexp.MustCompile(".* to keep$"),
},
data: data{lines: []string{"line1", "line2 to keep"}},
want: []string{"line2 to keep"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dir, err := os.MkdirTemp("", "watch")
logfile := path.Join(dir, "file.log")
if err != nil {
t.Errorf("unable to init tmp dir: %v", err)
return
}
defer os.RemoveAll(dir)
l := New()
defer l.Close()
chanLines := make(chan string, 5)
go func() {
err := l.Watch(logfile, tt.args.filter, chanLines)
if err != nil {
t.Errorf("unable to start to watching logs: %v", err)
}
}()
// Create new file to watch
file, err := os.Create(logfile)
if err != nil {
t.Errorf("unable to create log file: %v", err)
}
defer file.Close()
// Write test data into watched file
waitEndWriting := sync.WaitGroup{}
waitEndWriting.Add(len(tt.data.lines))
go func() {
for _, line := range tt.data.lines {
_, err = file.WriteString(line + "\n")
if err != nil {
t.Errorf("unable to write line '%v': %v", line, err)
return
}
waitEndWriting.Done()
}
}()
lineCpt := 0
waitEndWriting.Wait()
got := make([]string, 0, len(tt.want))
for len(got) < len(tt.want) {
select {
case line := <-chanLines:
got = append(got, line)
lineCpt++
case <-time.After(1 * time.Millisecond):
t.Errorf("waiting line timeout")
t.FailNow()
}
}
if !reflect.DeepEqual(tt.want, got) {
t.Errorf("LogMonitor_Watch(), bad lines published: got = %v, want %v", got, tt.want)
}
})
}
}