mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
support crawlab runtime log to ES
This commit is contained in:
@@ -40,6 +40,8 @@ other:
|
||||
tmppath: "/tmp"
|
||||
version: 0.4.10
|
||||
setting:
|
||||
crawlabLogToES: "N" # Send crawlab runtime log to ES, open this option "Y", remember to set esClient
|
||||
crawlabLogIndex: "crawlab-log"
|
||||
allowRegister: "N"
|
||||
enableTutorial: "N"
|
||||
runOnMaster: "Y"
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/apex/log"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gin-gonic/gin/binding"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/spf13/viper"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -133,6 +134,15 @@ func main() {
|
||||
// 以下为主节点服务
|
||||
if model.IsMaster() {
|
||||
// 中间件
|
||||
esClientStr := viper.GetString("setting.esClient")
|
||||
if viper.GetString("setting.crawlabLogToES") == "Y" && esClientStr != "" {
|
||||
ctx := context.Background()
|
||||
esClient, err := elastic.NewClient(elastic.SetURL(esClientStr), elastic.SetSniff(false))
|
||||
if err != nil {
|
||||
log.Error("Init es client Error:" + err.Error())
|
||||
}
|
||||
app.Use(middlewares.EsLog(ctx, esClient))
|
||||
}
|
||||
app.Use(middlewares.CORSMiddleware())
|
||||
anonymousGroup := app.Group("/")
|
||||
{
|
||||
|
||||
57
backend/middlewares/es_log.go
Normal file
57
backend/middlewares/es_log.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package middlewares
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/satori/go.uuid"
|
||||
"github.com/spf13/viper"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func EsLog(ctx context.Context, esClient *elastic.Client) gin.HandlerFunc {
|
||||
|
||||
return func(c *gin.Context) {
|
||||
// 开始时间
|
||||
crawlabIndex := viper.GetString("setting.crawlabLogIndex")
|
||||
sig := make(chan struct{}, 1)
|
||||
sig <- struct{}{}
|
||||
start := time.Now()
|
||||
// 处理请求
|
||||
c.Next()
|
||||
// 结束时间
|
||||
end := time.Now()
|
||||
//执行时间
|
||||
latency := strconv.FormatInt(int64(end.Sub(start).Milliseconds()), 10)
|
||||
path := c.Request.URL.Path
|
||||
|
||||
clientIP := c.ClientIP()
|
||||
method := c.Request.Method
|
||||
statusCode := strconv.Itoa(c.Writer.Status())
|
||||
buf := new(bytes.Buffer)
|
||||
buf.ReadFrom(c.Request.Body)
|
||||
b := buf.String()
|
||||
accessLog := "costTime:" + latency + "ms--" + "StatusCode:" + statusCode + "--" + "Method:" + method + "--" + "ClientIp:" + clientIP + "--" +
|
||||
"RequestURI:" + path + "--" + "Host:" + c.Request.Host + "--" + "UserAgent--" + c.Request.UserAgent() + "--RequestBody:" +
|
||||
string(b)
|
||||
WriteMsg(ctx, crawlabIndex, esClient, time.Now(), accessLog, sig)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// WriteMsg will write the msg and level into es
|
||||
func WriteMsg(ctx context.Context, crawlabIndex string, es *elastic.Client, when time.Time, msg string, sig chan struct{}) error {
|
||||
<-sig
|
||||
vals := make(map[string]interface{})
|
||||
vals["@timestamp"] = when.Format(time.RFC3339)
|
||||
vals["@msg"] = msg
|
||||
uid := uuid.NewV4().String()
|
||||
_, err := es.Index().Index(crawlabIndex).Id(uid).BodyJson(vals).Refresh("wait_for").Do(ctx)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user