package cleaner import ( "fmt" "github.com/cyrilix/pod-cleaner/pkg/metrics" "github.com/cyrilix/pod-cleaner/pkg/monitor" "go.uber.org/zap" "regexp" "strconv" "sync" ) var ( defaultRegex = regexp.MustCompile("There were many similar errors. Turn up verbosity to see them.\" err=\\\"orphaned pod \\\\\"(?P[0-9a-f\\-]+)\\\\\" found, but error not a directory occurred when trying to remove the volumes dir\" numErrs=(?P[0-9]+)") ) func NewPodWatcher(logfile string) *PodWatcher { return &PodWatcher{ m: monitor.New(), rex: defaultRegex, logfile: logfile, cancel: make(chan interface{}), } } type PodWatcher struct { m monitor.Monitor rex *regexp.Regexp logfile string cancel chan interface{} muPodError sync.RWMutex numErrors int podIdErrors string } func (w *PodWatcher) Close() error { close(w.cancel) if w.m != nil { err := w.m.Close() if err != nil { zap.S().Errorf("unable to close monitor resource: %v", err) } } return nil } func (w *PodWatcher) GetNumErrors() int { w.muPodError.RLock() defer w.muPodError.RUnlock() return w.numErrors } func (w *PodWatcher) GetPodDirInError() string { w.muPodError.RLock() defer w.muPodError.RUnlock() if w.podIdErrors == "" { return "" } return fmt.Sprintf("/var/lib/kubelet/pods/%s", w.podIdErrors) } func (w *PodWatcher) Watch() { lineChan := make(chan string) go func() { err := w.m.Watch(w.logfile, w.rex, lineChan) if err != nil { zap.S().Panicf("unable to init watching on file '%s', %v", w.logfile, err) } close(w.cancel) }() for { select { case line := <-lineChan: w.processLogRecord(line) case <-w.cancel: return } } } func (w *PodWatcher) processLogRecord(line string) { numErrors, err := strconv.Atoi(w.rex.FindStringSubmatch(line)[w.rex.SubexpIndex("numErrs")]) if err != nil { zap.S().Errorf("unable to extract numErrors value from line '%v': %v", line, err) return } w.muPodError.Lock() defer w.muPodError.Unlock() w.numErrors = numErrors w.podIdErrors = w.rex.FindStringSubmatch(line)[w.rex.SubexpIndex("podId")] metrics.NumErrors.Set(float64(numErrors)) }