diff --git a/README.md b/README.md new file mode 100644 index 0000000..61ab1c7 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# pod-cleaner + +Watch syslog file and generate alerts on orphaned pod error messages. + +See https://github.com/kubernetes/kubernetes/issues/105536 diff --git a/pkg/cleaner/cleaner.go b/pkg/cleaner/cleaner.go index 41d92a4..114c989 100644 --- a/pkg/cleaner/cleaner.go +++ b/pkg/cleaner/cleaner.go @@ -1 +1,82 @@ package cleaner + +import ( + "fmt" + "github.com/cyrilix/pod-cleaner/pkg/monitor" + "go.uber.org/zap" + "regexp" + "strconv" + "sync" +) + +var ( + defaultRegex = regexp.MustCompile("There were many similar errors. Turn up verbosity to see them.\" err=\\\"orphaned pod \\\\\"(?P[0-9a-f\\-]+)\\\\\" found, but error not a directory occurred when trying to remove the volumes dir\" numErrs=(?P[0-9]+)") +) + +func NewPodWatcher(logfile string) *PodWatcher { + return &PodWatcher{ + m: monitor.New(), + rex: defaultRegex, + logfile: logfile, + cancel: make(chan interface{}), + } +} + +type PodWatcher struct { + m monitor.Monitor + rex *regexp.Regexp + logfile string + + cancel chan interface{} + + muPodError sync.RWMutex + numErrors int + podIdErrors string +} + +func (w *PodWatcher) GetNumErrors() int { + w.muPodError.RLock() + defer w.muPodError.RUnlock() + return w.numErrors +} + +func (w *PodWatcher) GetPodDirInError() string { + w.muPodError.RLock() + defer w.muPodError.RUnlock() + if w.podIdErrors == "" { + return "" + } + return fmt.Sprintf("/var/lib/kubelet/pods/%s", w.podIdErrors) +} + +func (w *PodWatcher) Watch() { + lineChan := make(chan string) + go func() { + err := w.m.Watch(w.logfile, w.rex, lineChan) + if err != nil { + zap.S().Panicf("unable to init watching on file '%s', %v", w.logfile, err) + } + close(w.cancel) + }() + + for { + select { + case line := <-lineChan: + w.processLogRecord(line) + case <-w.cancel: + return + } + } +} + +func (w *PodWatcher) processLogRecord(line string) { + numErrors, err := strconv.Atoi(w.rex.FindStringSubmatch(line)[w.rex.SubexpIndex("numErrs")]) + if err != nil { + zap.S().Errorf("unable to extract numErrors value from line '%v': %v", line, err) + return + } + w.muPodError.Lock() + defer w.muPodError.Unlock() + w.numErrors = numErrors + w.podIdErrors = w.rex.FindStringSubmatch(line)[w.rex.SubexpIndex("podId")] +} diff --git a/pkg/cleaner/cleaner_test.go b/pkg/cleaner/cleaner_test.go index 41d92a4..2b3a6e2 100644 --- a/pkg/cleaner/cleaner_test.go +++ b/pkg/cleaner/cleaner_test.go @@ -1 +1,50 @@ package cleaner + +import ( + "github.com/cyrilix/pod-cleaner/pkg/monitor" + "testing" +) + +func TestPodWatcher_Watch(t *testing.T) { + type fields struct { + logfile string + } + tests := []struct { + name string + fields fields + wantPodDir string + wantNumError int + }{ + { + name: "one error in file", + fields: fields{ + logfile: "test_data/one-error.log", + }, + wantPodDir: "/var/lib/kubelet/pods/1d4bfc07-3469-4eaa-992f-6d23c17f3aee", + wantNumError: 1, + }, + { + name: "no error", + fields: fields{ + logfile: "test_data/no-error.log", + }, + wantPodDir: "", + wantNumError: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := NewPodWatcher(tt.fields.logfile) + w.m = monitor.New(monitor.WithTailFollow(false), monitor.WithReOpen(false)) + w.Watch() + gotNumErr := w.GetNumErrors() + if gotNumErr != tt.wantNumError { + t.Errorf("PodWatcher_Watch(), bad numErrors found: %v, want %v", gotNumErr, tt.wantNumError) + } + gotPodDir := w.GetPodDirInError() + if gotPodDir != tt.wantPodDir { + t.Errorf("PodWatcher_Watch(), bad pod directory found: %v, want %v", gotPodDir, tt.wantPodDir) + } + }) + } +} diff --git a/pkg/cleaner/test_data/no-error.log b/pkg/cleaner/test_data/no-error.log new file mode 100644 index 0000000..1ccf2a2 --- /dev/null +++ b/pkg/cleaner/test_data/no-error.log @@ -0,0 +1,3 @@ +egreg +abcffe +erfer \ No newline at end of file diff --git a/pkg/cleaner/test_data/one-error.log b/pkg/cleaner/test_data/one-error.log new file mode 100644 index 0000000..fd8f775 --- /dev/null +++ b/pkg/cleaner/test_data/one-error.log @@ -0,0 +1,4 @@ +egreg +Oct 07 12:46:32 k8s-master2-staging kubelet[7310]: E1007 12:46:32.359145 7310 kubelet_volumes.go:245] "There were many similar errors. Turn up verbosity to see them." err="orphaned pod \"1d4bfc07-3469-4eaa-992f-6d23c17f3aee\" found, but error not a directory occurred when trying to remove the volumes dir" numErrs=1 +abcffe +erfer \ No newline at end of file diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 5f1ee40..3ba598e 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -7,18 +7,42 @@ import ( "regexp" ) +type Option func(monitor *LogMonitor) + +// WithTailFollow Continue looking for new lines (tail -f) +func WithTailFollow(follow bool) Option { + return func(monitor *LogMonitor) { + monitor.follow = follow + } +} + +// WithReOpen Reopen recreated files (tail -F) +func WithReOpen(reopen bool) Option { + return func(monitor *LogMonitor) { + monitor.reopen = reopen + } +} + type Monitor interface { Close() error Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error } -func New() *LogMonitor { - return &LogMonitor{ +func New(opts ...Option) *LogMonitor { + l := &LogMonitor{ + follow: true, + reopen: true, cancel: make(chan interface{}), } + for _, opt := range opts { + opt(l) + } + return l } type LogMonitor struct { + follow bool + reopen bool cancel chan interface{} } @@ -28,7 +52,7 @@ func (l *LogMonitor) Close() error { } func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error { - t, err := tail.TailFile(filename, tail.Config{Follow: true, ReOpen: true, Logger: tail.DiscardingLogger}) + t, err := tail.TailFile(filename, tail.Config{Follow: l.follow, ReOpen: l.reopen, Logger: tail.DiscardingLogger}) if err != nil { return fmt.Errorf("unable to begin to tail file '%s': %w", filename, err) } @@ -36,11 +60,15 @@ func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan for { select { case line := <-t.Lines: + if line == nil { + // EOF + t.Cleanup() + return nil + } if line.Err != nil { zap.S().Errorf("tail error: %v", err) continue } - zap.S().Infof("new line: '%s'", line.Text) if filter.MatchString(line.Text) { lineChan <- line.Text } @@ -50,5 +78,4 @@ func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan return nil } } - return nil }