diff --git a/core/controllers/utils_http.go b/core/controllers/utils_http.go index a26f4ea1..0d50231d 100644 --- a/core/controllers/utils_http.go +++ b/core/controllers/utils_http.go @@ -3,13 +3,14 @@ package controllers import ( "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/trace" "github.com/gin-gonic/gin" "net/http" ) -func handleError(statusCode int, c *gin.Context, err error, print bool) { - if print { +func handleError(statusCode int, c *gin.Context, err error) { + if utils.IsDev() { trace.PrintError(err) } c.AbortWithStatusJSON(statusCode, entity.Response{ @@ -20,11 +21,7 @@ func handleError(statusCode int, c *gin.Context, err error, print bool) { } func HandleError(statusCode int, c *gin.Context, err error) { - handleError(statusCode, c, err, true) -} - -func HandleErrorNoPrint(statusCode int, c *gin.Context, err error) { - handleError(statusCode, c, err, false) + handleError(statusCode, c, err) } func HandleErrorBadRequest(c *gin.Context, err error) { @@ -43,10 +40,6 @@ func HandleErrorNotFound(c *gin.Context, err error) { HandleError(http.StatusNotFound, c, err) } -func HandleErrorNotFoundNoPrint(c *gin.Context, err error) { - HandleErrorNoPrint(http.StatusNotFound, c, err) -} - func HandleErrorInternalServerError(c *gin.Context, err error) { HandleError(http.StatusInternalServerError, c, err) } diff --git a/core/entity/http.go b/core/entity/http.go index 60231152..40be927f 100644 --- a/core/entity/http.go +++ b/core/entity/http.go @@ -20,3 +20,5 @@ type ListResponse struct { type BatchRequestPayload struct { Ids []primitive.ObjectID `form:"ids" json:"ids"` } + +type RequestParam map[string]interface{} diff --git a/core/go.mod b/core/go.mod index e4237e2f..68751cd6 100644 --- a/core/go.mod +++ b/core/go.mod @@ -27,7 +27,6 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-uuid v1.0.3 - github.com/imroc/req v0.3.2 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.0 diff --git a/core/go.sum b/core/go.sum index 48492f37..55467c3e 100644 --- a/core/go.sum +++ b/core/go.sum @@ -346,8 +346,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/imroc/req v0.3.2 h1:M/JkeU6RPmX+WYvT2vaaOL0K+q8ufL5LxwvJc4xeB4o= -github.com/imroc/req v0.3.2/go.mod h1:F+NZ+2EFSo6EFXdeIbpfE9hcC233id70kf0byW97Caw= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= @@ -614,6 +612,7 @@ golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -737,6 +736,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -816,6 +816,7 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -823,6 +824,7 @@ golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -838,6 +840,7 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/core/notification/im.go b/core/notification/im.go index ea2da58f..e481e509 100644 --- a/core/notification/im.go +++ b/core/notification/im.go @@ -1,14 +1,21 @@ package notification import ( + "bytes" + "encoding/json" "errors" "fmt" + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/utils" + "io" + "net/http" + "regexp" + "strings" + "time" + "github.com/apex/log" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/trace" - "github.com/imroc/req" - "regexp" - "strings" ) type ResBody struct { @@ -16,6 +23,41 @@ type ResBody struct { ErrMsg string `json:"errmsg"` } +// RequestParam represents parameters for HTTP requests +type RequestParam entity.RequestParam + +// performRequest performs an HTTP request with JSON body +func performRequest(method, url string, data interface{}) (*http.Response, []byte, error) { + var reqBody io.Reader + if data != nil { + jsonData, err := json.Marshal(data) + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal request body: %v", err) + } + reqBody = bytes.NewBuffer(jsonData) + } + + req, err := http.NewRequest(method, url, reqBody) + if err != nil { + return nil, nil, fmt.Errorf("failed to create request: %v", err) + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + + resp, err := utils.NewHttpClient(15 * time.Second).Do(req) + if err != nil { + return nil, nil, fmt.Errorf("failed to perform request: %v", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, nil, fmt.Errorf("failed to read response body: %v", err) + } + + return resp, body, nil +} + func SendIMNotification(ch *models.NotificationChannel, title, content string) error { switch ch.Provider { case ChannelIMProviderLark: @@ -34,43 +76,42 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e return sendIMMSTeams(ch, title, content) } - // request header - header := req.Header{ - "Content-Type": "application/json; charset=utf-8", - } - // request data - data := req.Param{ + data := RequestParam{ "msgtype": "markdown", - "markdown": req.Param{ + "markdown": RequestParam{ "title": title, "text": content, "content": content, }, - "at": req.Param{ + "at": RequestParam{ "atMobiles": []string{}, "isAtAll": false, }, "text": content, } if strings.Contains(strings.ToLower(ch.WebhookUrl), "feishu") { - data = req.Param{ + data = RequestParam{ "msg_type": "text", - "content": req.Param{ + "content": RequestParam{ "text": content, }, } } // perform request - res, err := req.Post(ch.WebhookUrl, header, req.BodyJSON(&data)) + resp, body, err := performRequest("POST", ch.WebhookUrl, data) if err != nil { return trace.TraceError(err) } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("request failed with status code: %d", resp.StatusCode) + } + // parse response var resBody ResBody - if err := res.ToJSON(&resBody); err != nil { + if err := json.Unmarshal(body, &resBody); err != nil { return trace.TraceError(err) } @@ -82,47 +123,31 @@ func SendIMNotification(ch *models.NotificationChannel, title, content string) e return nil } -func getIMRequestHeader() req.Header { - return req.Header{ - "Content-Type": "application/json; charset=utf-8", - } -} - -func performIMRequest(webhookUrl string, data req.Param) (res *req.Resp, err error) { - // perform request - res, err = req.Post(webhookUrl, getIMRequestHeader(), req.BodyJSON(&data)) +func performIMRequest(webhookUrl string, data RequestParam) ([]byte, error) { + resp, body, err := performRequest("POST", webhookUrl, data) if err != nil { log.Errorf("IM request error: %v", err) return nil, err } - // get response - response := res.Response() - - // check status code - if response.StatusCode >= 400 { - log.Errorf("IM response status code: %d", res.Response().StatusCode) - return nil, errors.New(fmt.Sprintf("IM error response %d: %s", response.StatusCode, res.String())) + if resp.StatusCode >= 400 { + log.Errorf("IM response status code: %d", resp.StatusCode) + return nil, fmt.Errorf("IM error response %d: %s", resp.StatusCode, string(body)) } - return res, nil + return body, nil } -func performIMRequestWithJson[T any](webhookUrl string, data req.Param) (resBody T, err error) { - res, err := performIMRequest(webhookUrl, data) +func performIMRequestWithJson[T any](webhookUrl string, data RequestParam) (resBody T, err error) { + body, err := performIMRequest(webhookUrl, data) if err != nil { return resBody, err } // parse response - if err := res.ToJSON(&resBody); err != nil { + if err := json.Unmarshal(body, &resBody); err != nil { log.Warnf("Parsing IM response error: %v", err) - resText, err := res.ToString() - if err != nil { - log.Warnf("Converting response to string error: %v", err) - return resBody, err - } - log.Infof("IM response: %s", resText) + log.Infof("IM response: %s", string(body)) return resBody, nil } @@ -193,16 +218,16 @@ func convertMarkdownToTelegram(markdownText string) string { } func sendIMLark(ch *models.NotificationChannel, title, content string) error { - data := req.Param{ + data := RequestParam{ "msg_type": "interactive", - "card": req.Param{ - "header": req.Param{ - "title": req.Param{ + "card": RequestParam{ + "header": RequestParam{ + "title": RequestParam{ "tag": "plain_text", "content": title, }, }, - "elements": []req.Param{ + "elements": []RequestParam{ { "tag": "markdown", "content": content, @@ -221,9 +246,9 @@ func sendIMLark(ch *models.NotificationChannel, title, content string) error { } func sendIMDingTalk(ch *models.NotificationChannel, title string, content string) error { - data := req.Param{ + data := RequestParam{ "msgtype": "markdown", - "markdown": req.Param{ + "markdown": RequestParam{ "title": title, "text": fmt.Sprintf("# %s\n\n%s", title, content), }, @@ -239,9 +264,9 @@ func sendIMDingTalk(ch *models.NotificationChannel, title string, content string } func sendIMWechatWork(ch *models.NotificationChannel, title string, content string) error { - data := req.Param{ + data := RequestParam{ "msgtype": "markdown", - "markdown": req.Param{ + "markdown": RequestParam{ "content": fmt.Sprintf("# %s\n\n%s", title, content), }, } @@ -256,10 +281,10 @@ func sendIMWechatWork(ch *models.NotificationChannel, title string, content stri } func sendIMSlack(ch *models.NotificationChannel, title, content string) error { - data := req.Param{ - "blocks": []req.Param{ - {"type": "header", "text": req.Param{"type": "plain_text", "text": title}}, - {"type": "section", "text": req.Param{"type": "mrkdwn", "text": convertMarkdownToSlack(content)}}, + data := RequestParam{ + "blocks": []RequestParam{ + {"type": "header", "text": RequestParam{"type": "plain_text", "text": title}}, + {"type": "section", "text": RequestParam{"type": "mrkdwn", "text": convertMarkdownToSlack(content)}}, }, } _, err := performIMRequest(ch.WebhookUrl, data) @@ -291,7 +316,7 @@ func sendIMTelegram(ch *models.NotificationChannel, title string, content string text = convertMarkdownToTelegram(text) // request data - data := req.Param{ + data := RequestParam{ "chat_id": chatId, "text": text, "parse_mode": "MarkdownV2", @@ -306,8 +331,8 @@ func sendIMTelegram(ch *models.NotificationChannel, title string, content string } func sendIMDiscord(ch *models.NotificationChannel, title string, content string) error { - data := req.Param{ - "embeds": []req.Param{ + data := RequestParam{ + "embeds": []RequestParam{ { "title": title, "description": content, @@ -322,16 +347,16 @@ func sendIMDiscord(ch *models.NotificationChannel, title string, content string) } func sendIMMSTeams(ch *models.NotificationChannel, title string, content string) error { - data := req.Param{ + data := RequestParam{ "type": "message", - "attachments": []req.Param{{ + "attachments": []RequestParam{{ "contentType": "application/vnd.microsoft.card.adaptive", "contentUrl": nil, - "content": req.Param{ + "content": RequestParam{ "$schema": "https://adaptivecards.io/schemas/adaptive-card.json", "type": "AdaptiveCard", "version": "1.2", - "body": []req.Param{ + "body": []RequestParam{ { "type": "TextBlock", "text": fmt.Sprintf("**%s**", title), diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index ca6a4d77..13d87bce 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -309,6 +309,11 @@ func (r *Runner) configureEnv() { } func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) { + // Normalize path + if strings.HasPrefix(path, "/") { + path = path[1:] + } + // Construct master URL var id string if r.s.GitId.IsZero() { @@ -316,7 +321,7 @@ func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) } else { id = r.s.GitId.Hex() } - url := fmt.Sprintf("%s/sync/%s%s", utils.GetApiEndpoint(), id, path) + url := fmt.Sprintf("%s/sync/%s/%s", utils.GetApiEndpoint(), id, path) // Create and execute request req, err := http.NewRequest(method, url, nil) @@ -357,7 +362,8 @@ func (r *Runner) syncFiles() (err error) { var masterFiles map[string]entity.FsFileInfo err = json.Unmarshal(body, &masterFiles) if err != nil { - log.Errorf("error unmarshaling JSON: %v", err) + log.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String()) + log.Errorf("error details: %v", err) return err } diff --git a/core/task/handler/service.go b/core/task/handler/service.go index 12b6d284..d3288957 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -239,7 +239,7 @@ func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunne } func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { - log.Debugf("[TaskHandlerService] addRunner: taskId[%v]", taskId) + log.Debugf("[TaskHandlerService] addRunner: taskId[%s]", taskId.Hex()) svc.runners.Store(taskId, r) } diff --git a/core/utils/http.go b/core/utils/http.go index b0e48ebf..2c212f15 100644 --- a/core/utils/http.go +++ b/core/utils/http.go @@ -6,6 +6,7 @@ import ( "github.com/crawlab-team/crawlab/trace" "github.com/gin-gonic/gin" "net/http" + "time" ) func handleError(statusCode int, c *gin.Context, err error, print bool) { @@ -30,3 +31,9 @@ func HandleErrorUnauthorized(c *gin.Context, err error) { func HandleErrorInternalServerError(c *gin.Context, err error) { HandleError(http.StatusInternalServerError, c, err) } + +func NewHttpClient(timeout time.Duration) *http.Client { + return &http.Client{ + Timeout: timeout, + } +}