Merge pull request #733 from hantmac/develop

bug fix: fix es bug
This commit is contained in:
Marvin Zhang
2020-05-15 11:21:42 +08:00
committed by GitHub

View File

@@ -168,9 +168,9 @@ func SetEnv(cmd *exec.Cmd, envs []model.Env, task model.Task, spider model.Spide
func SetLogConfig(wg *sync.WaitGroup, cmd *exec.Cmd, t model.Task, u model.User) error {
//esChan := make(chan string, 1)
//esClientStr := viper.GetString("setting.esClient")
//spiderLogIndex := viper.GetString("setting.spiderLogIndex")
esChan := make(chan string, 1)
esClientStr := viper.GetString("setting.esClient")
spiderLogIndex := viper.GetString("setting.spiderLogIndex")
// get stdout reader
stdout, err := cmd.StdoutPipe()
readerStdout := bufio.NewReader(stdout)
@@ -195,9 +195,9 @@ func SetLogConfig(wg *sync.WaitGroup, cmd *exec.Cmd, t model.Task, u model.User)
isStderrFinished := false
// periodically (5 sec) insert log items
//wg.Add(3)
wg.Add(3)
go func() {
//defer wg.Done()
defer wg.Done()
for {
_ = model.AddLogItems(logs)
logs = []model.LogItem{}
@@ -217,7 +217,7 @@ func SetLogConfig(wg *sync.WaitGroup, cmd *exec.Cmd, t model.Task, u model.User)
// read stdout
go func() {
//defer wg.Done()
defer wg.Done()
for {
line, err := readerStdout.ReadString('\n')
if err != nil {
@@ -234,10 +234,11 @@ func SetLogConfig(wg *sync.WaitGroup, cmd *exec.Cmd, t model.Task, u model.User)
Ts: time.Now(),
ExpireTs: time.Now().Add(time.Duration(expireDuration) * time.Second),
}
//esChan <- l.Message
//if esClientStr != "" {
// go database.WriteMsgToES(time.Now(), esChan, spiderLogIndex)
//}
if esClientStr != "" {
esChan <- l.Message
go database.WriteMsgToES(time.Now(), esChan, spiderLogIndex)
}
logs = append(logs, l)
}
@@ -245,7 +246,7 @@ func SetLogConfig(wg *sync.WaitGroup, cmd *exec.Cmd, t model.Task, u model.User)
// read stderr
go func() {
//defer wg.Done()
defer wg.Done()
for {
line, err := readerStderr.ReadString('\n')
if err != nil {
@@ -262,15 +263,16 @@ func SetLogConfig(wg *sync.WaitGroup, cmd *exec.Cmd, t model.Task, u model.User)
Ts: time.Now(),
ExpireTs: time.Now().Add(time.Duration(expireDuration) * time.Second),
}
//esChan <- l.Message
//if esClientStr != "" {
// go database.WriteMsgToES(time.Now(), esChan, spiderLogIndex)
//}
if esClientStr != "" {
esChan <- l.Message
go database.WriteMsgToES(time.Now(), esChan, spiderLogIndex)
}
logs = append(logs, l)
}
}()
//wg.Wait()
wg.Wait()
return nil
}