Merge pull request #9 from crawlab-team/develop

Develop
This commit is contained in:
暗音
2019-10-28 11:06:21 +08:00
committed by GitHub
29 changed files with 179 additions and 158 deletions

View File

@@ -2,6 +2,7 @@ package database
import (
"context"
"crawlab/utils"
"fmt"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
@@ -26,7 +27,7 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
tick := time.NewTicker(time.Second * 3)
defer tick.Stop()
go func() {
defer func() { _ = psc.Close() }()
defer utils.Close(psc)
for {
switch msg := psc.Receive().(type) {
case error:
@@ -87,7 +88,7 @@ func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
}
func (r *Redis) Publish(channel, message string) (n int, err error) {
conn := r.pool.Get()
defer func() { _ = conn.Close() }()
defer utils.Close(conn)
n, err = redis.Int(conn.Do("PUBLISH", channel, message))
if err != nil {
return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message)

View File

@@ -1,6 +1,10 @@
package database
import (
"context"
"crawlab/entity"
"crawlab/utils"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
"runtime/debug"
@@ -18,7 +22,7 @@ func NewRedisClient() *Redis {
}
func (r *Redis) RPush(collection string, value interface{}) error {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
if _, err := c.Do("RPUSH", collection, value); err != nil {
debug.PrintStack()
@@ -29,7 +33,7 @@ func (r *Redis) RPush(collection string, value interface{}) error {
func (r *Redis) LPop(collection string) (string, error) {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
value, err2 := redis.String(c.Do("LPOP", collection))
if err2 != nil {
@@ -40,7 +44,7 @@ func (r *Redis) LPop(collection string) (string, error) {
func (r *Redis) HSet(collection string, key string, value string) error {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
if _, err := c.Do("HSET", collection, key, value); err != nil {
debug.PrintStack()
@@ -51,7 +55,7 @@ func (r *Redis) HSet(collection string, key string, value string) error {
func (r *Redis) HGet(collection string, key string) (string, error) {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
value, err2 := redis.String(c.Do("HGET", collection, key))
if err2 != nil {
@@ -62,7 +66,7 @@ func (r *Redis) HGet(collection string, key string) (string, error) {
func (r *Redis) HDel(collection string, key string) error {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
if _, err := c.Do("HDEL", collection, key); err != nil {
return err
@@ -72,7 +76,7 @@ func (r *Redis) HDel(collection string, key string) error {
func (r *Redis) HKeys(collection string) ([]string, error) {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
value, err2 := redis.Strings(c.Do("HKeys", collection))
if err2 != nil {
@@ -120,3 +124,22 @@ func InitRedis() error {
RedisClient = NewRedisClient()
return nil
}
func Pub(channel string, msg entity.NodeMessage) error {
if _, err := RedisClient.Publish(channel, utils.GetJson(msg)); err != nil {
log.Errorf("publish redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func Sub(channel string, consume ConsumeFunc) error {
ctx := context.Background()
if err := RedisClient.Subscribe(ctx, consume, channel); err != nil {
log.Errorf("subscribe redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}

View File

@@ -24,7 +24,6 @@ func (O OPError) Error() string {
switch O.Scope {
case ScopeSystem:
scope = "system"
break
case ScopeBusiness:
scope = "business"
}

View File

@@ -44,17 +44,14 @@ func TestFuncPanicRecovery(t *testing.T) {
WithChain(Recover(newBufLogger(&buf))))
cron.Start()
defer cron.Stop()
cron.AddFunc("* * * * * ?", func() {
_, _ = cron.AddFunc("* * * * * ?", func() {
panic("YOLO")
})
select {
case <-time.After(OneSecond):
if !strings.Contains(buf.String(), "YOLO") {
t.Error("expected a panic to be logged, got none")
}
return
<-time.After(OneSecond)
if !strings.Contains(buf.String(), "YOLO") {
t.Error("expected a panic to be logged, got none")
}
}
type DummyJob struct{}
@@ -71,7 +68,7 @@ func TestJobPanicRecovery(t *testing.T) {
WithChain(Recover(newBufLogger(&buf))))
cron.Start()
defer cron.Stop()
cron.AddJob("* * * * * ?", job)
_, _ = cron.AddJob("* * * * * ?", job)
select {
case <-time.After(OneSecond):
@@ -102,7 +99,7 @@ func TestStopCausesJobsToNotRun(t *testing.T) {
cron := newWithSeconds()
cron.Start()
cron.Stop()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
select {
case <-time.After(OneSecond):
@@ -118,7 +115,7 @@ func TestAddBeforeRunning(t *testing.T) {
wg.Add(1)
cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -138,7 +135,7 @@ func TestAddWhileRunning(t *testing.T) {
cron := newWithSeconds()
cron.Start()
defer cron.Stop()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
select {
case <-time.After(OneSecond):
@@ -154,7 +151,7 @@ func TestAddWhileRunningWithDelay(t *testing.T) {
defer cron.Stop()
time.Sleep(5 * time.Second)
var calls int64
cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
_, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
<-time.After(OneSecond)
if atomic.LoadInt64(&calls) != 1 {
@@ -205,7 +202,7 @@ func TestSnapshotEntries(t *testing.T) {
wg.Add(1)
cron := New()
cron.AddFunc("@every 2s", func() { wg.Done() })
_, _ = cron.AddFunc("@every 2s", func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -232,12 +229,12 @@ func TestMultipleEntries(t *testing.T) {
wg.Add(2)
cron := newWithSeconds()
cron.AddFunc("0 0 0 1 1 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
id1, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
id2, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
cron.AddFunc("0 0 0 31 12 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Remove(id1)
cron.Start()
@@ -257,9 +254,9 @@ func TestRunningJobTwice(t *testing.T) {
wg.Add(2)
cron := newWithSeconds()
cron.AddFunc("0 0 0 1 1 ?", func() {})
cron.AddFunc("0 0 0 31 12 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -276,9 +273,9 @@ func TestRunningMultipleSchedules(t *testing.T) {
wg.Add(2)
cron := newWithSeconds()
cron.AddFunc("0 0 0 1 1 ?", func() {})
cron.AddFunc("0 0 0 31 12 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Schedule(Every(time.Minute), FuncJob(func() {}))
cron.Schedule(Every(time.Second), FuncJob(func() { wg.Done() }))
cron.Schedule(Every(time.Hour), FuncJob(func() {}))
@@ -310,7 +307,7 @@ func TestLocalTimezone(t *testing.T) {
now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month())
cron := newWithSeconds()
cron.AddFunc(spec, func() { wg.Done() })
_, _ = cron.AddFunc(spec, func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -344,7 +341,7 @@ func TestNonLocalTimezone(t *testing.T) {
now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month())
cron := New(WithLocation(loc), WithParser(secondParser))
cron.AddFunc(spec, func() { wg.Done() })
_, _ = cron.AddFunc(spec, func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -386,7 +383,7 @@ func TestBlockingRun(t *testing.T) {
wg.Add(1)
cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
var unblockChan = make(chan struct{})
@@ -410,7 +407,7 @@ func TestStartNoop(t *testing.T) {
var tickChan = make(chan struct{}, 2)
cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() {
_, _ = cron.AddFunc("* * * * * ?", func() {
tickChan <- struct{}{}
})
@@ -438,10 +435,10 @@ func TestJob(t *testing.T) {
wg.Add(1)
cron := newWithSeconds()
cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"})
cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"})
_, _ = cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"})
_, _ = cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"})
job2, _ := cron.AddJob("* * * * * ?", testJob{wg, "job2"})
cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"})
_, _ = cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"})
cron.Schedule(Every(5*time.Second+5*time.Nanosecond), testJob{wg, "job4"})
job5 := cron.Schedule(Every(5*time.Minute), testJob{wg, "job5"})
@@ -465,7 +462,7 @@ func TestJob(t *testing.T) {
// Ensure the entries are in the right order.
expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"}
var actuals []string
var actuals = make([]string, 0, len(cron.Entries()))
for _, entry := range cron.Entries() {
actuals = append(actuals, entry.Job.(testJob).name)
}
@@ -545,7 +542,7 @@ func (*ZeroSchedule) Next(time.Time) time.Time {
func TestJobWithZeroTimeDoesNotRun(t *testing.T) {
cron := newWithSeconds()
var calls int64
cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
_, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
cron.Schedule(new(ZeroSchedule), FuncJob(func() { t.Error("expected zero task will not run") }))
cron.Start()
defer cron.Stop()
@@ -582,11 +579,11 @@ func TestStopAndWait(t *testing.T) {
t.Run("a couple fast jobs added, still returns immediately", func(t *testing.T) {
cron := newWithSeconds()
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
cron.Start()
cron.AddFunc("* * * * * *", func() {})
cron.AddFunc("* * * * * *", func() {})
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
time.Sleep(time.Second)
ctx := cron.Stop()
select {
@@ -598,10 +595,10 @@ func TestStopAndWait(t *testing.T) {
t.Run("a couple fast jobs and a slow job added, waits for slow job", func(t *testing.T) {
cron := newWithSeconds()
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
cron.Start()
cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
_, _ = cron.AddFunc("* * * * * *", func() {})
time.Sleep(time.Second)
ctx := cron.Stop()
@@ -625,10 +622,10 @@ func TestStopAndWait(t *testing.T) {
t.Run("repeated calls to stop, waiting for completion and after", func(t *testing.T) {
cron := newWithSeconds()
cron.AddFunc("* * * * * *", func() {})
cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
_, _ = cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
cron.Start()
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
time.Sleep(time.Second)
ctx := cron.Stop()
ctx2 := cron.Stop()

View File

@@ -9,10 +9,10 @@ import (
)
// DefaultLogger is used by Cron if none is specified.
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
var DefaultLogger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
// DiscardLogger can be used by callers to discard all log messages.
var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0))
var DiscardLogger = PrintfLogger(log.New(ioutil.Discard, "", 0))
// Logger is the interface used in this package for logging, so that any backend
// can be plugged in. It is a subset of the github.com/go-logr/logr interface.

View File

@@ -30,7 +30,7 @@ func TestWithVerboseLogger(t *testing.T) {
t.Error("expected provided logger")
}
c.AddFunc("@every 1s", func() {})
_, _ = c.AddFunc("@every 1s", func() {})
c.Start()
time.Sleep(OneSecond)
c.Stop()

View File

@@ -304,6 +304,7 @@ func TestNormalizeFields_Errors(t *testing.T) {
actual, err := normalizeFields(test.input, test.options)
if err == nil {
t.Errorf("expected an error, got none. results: %v", actual)
return
}
if !strings.Contains(err.Error(), test.err) {
t.Errorf("expected error %q, got %q", test.err, err.Error())

View File

@@ -178,8 +178,8 @@ WRAP:
// restrictions are satisfied by the given time.
func dayMatches(s *SpecSchedule, t time.Time) bool {
var (
domMatch bool = 1<<uint(t.Day())&s.Dom > 0
dowMatch bool = 1<<uint(t.Weekday())&s.Dow > 0
domMatch = 1<<uint(t.Day())&s.Dom > 0
dowMatch = 1<<uint(t.Weekday())&s.Dow > 0
)
if s.Dom&starBit > 0 || s.Dow&starBit > 0 {
return domMatch && dowMatch

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"crawlab/config"
"crawlab/database"
"crawlab/lib/validate_bridge"
@@ -12,7 +13,13 @@ import (
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/spf13/viper"
"net"
"net/http"
"os"
"os/signal"
"runtime/debug"
"syscall"
"time"
)
func main() {
@@ -166,8 +173,26 @@ func main() {
// 运行服务器
host := viper.GetString("server.host")
port := viper.GetString("server.port")
if err := app.Run(host + ":" + port); err != nil {
address := net.JoinHostPort(host, port)
srv := &http.Server{
Handler: app,
Addr: address,
}
go func() {
if err := srv.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
log.Error("run server error:" + err.Error())
} else {
log.Info("server graceful down")
}
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := srv.Shutdown(ctx2); err != nil {
log.Error("run server error:" + err.Error())
panic(err)
}
}

View File

@@ -188,7 +188,7 @@ func DeleteNode(c *gin.Context) {
id := bson.ObjectId("5d429e6c19f7abede924fee2")
for _, node := range NodeList {
if node.Id == bson.ObjectId(id) {
if node.Id == id {
log.Infof("Delete a node")
}
}

View File

@@ -1,6 +1,7 @@
package model
import (
"crawlab/utils"
"github.com/apex/log"
"os"
"runtime/debug"
@@ -21,7 +22,7 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) {
debug.PrintStack()
return nil, err
}
defer f.Close()
defer utils.Close(f)
const bufLen = 1 * 1024 * 1024
logBuf := make([]byte, bufLen)

View File

@@ -34,7 +34,6 @@ type Node struct {
const (
Yes = "Y"
No = "N"
)
// 当前节点是否为主节点

View File

@@ -110,7 +110,7 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, int, erro
defer s.Close()
// 获取爬虫列表
spiders := []Spider{}
var spiders []Spider
if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil {
debug.PrintStack()
return spiders, 0, err
@@ -231,7 +231,7 @@ func RemoveAllSpider() error {
s, c := database.GetCol("spiders")
defer s.Close()
spiders := []Spider{}
var spiders []Spider
err := c.Find(nil).All(&spiders)
if err != nil {
log.Error("get all spiders error:" + err.Error())

View File

@@ -66,7 +66,7 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
"message": "error",
"error": errStr,
})
break
case validator.ValidationErrors:
validatorErrors := causeError.(validator.ValidationErrors)
//firstError := validatorErrors[0].(validator.FieldError)
@@ -75,7 +75,6 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
"message": "error",
"error": validatorErrors.Error(),
})
break
default:
fmt.Println("deprecated....")
c.AbortWithStatusJSON(httpCode, gin.H{

View File

@@ -74,7 +74,7 @@ func DeleteLogPeriodically() {
for _, fi := range rd {
if fi.IsDir() {
log.Info(filepath.Join(logDir, fi.Name()))
os.RemoveAll(filepath.Join(logDir, fi.Name()))
_ = os.RemoveAll(filepath.Join(logDir, fi.Name()))
log.Info("Delete Log File Success")
}
}

View File

@@ -13,10 +13,8 @@ import (
func TestDeleteLogPeriodically(t *testing.T) {
Convey("Test DeleteLogPeriodically", t, func() {
if err := config.InitConfig("../conf/config.yml"); err != nil {
log.Error("init config error:" + err.Error())
panic(err)
}
err := config.InitConfig("../conf/config.yml")
So(err, ShouldBeNil)
log.Info("初始化配置成功")
logDir := viper.GetString("log.path")
log.Info(logDir)
@@ -28,24 +26,16 @@ func TestGetLocalLog(t *testing.T) {
//create a log file for test
logPath := "../logs/crawlab/test.log"
f, err := os.Create(logPath)
defer f.Close()
defer utils.Close(f)
if err != nil {
fmt.Println(err.Error())
fmt.Println(err)
} else {
_, err = f.WriteString("This is for test")
fmt.Println(err)
}
Convey("Test GetLocalLog", t, func() {
Convey("Test response", func() {
logStr, err := GetLocalLog(logPath)
log.Info(utils.BytesToString(logStr))
fmt.Println(err)
So(err, ShouldEqual, nil)
})
})
//delete the test log file
os.Remove(logPath)
_ = os.Remove(logPath)
}

View File

@@ -2,6 +2,7 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"

View File

@@ -2,9 +2,9 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
)
type SystemInfo struct {
@@ -22,7 +22,7 @@ func (s *SystemInfo) Handle() error {
NodeId: s.msg.NodeId,
SysInfo: sysInfo,
}
if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil {
if err := database.Pub(constants.ChannelMasterNode, msgSd); err != nil {
return err
}
return nil

View File

@@ -102,7 +102,7 @@ func handleNodeInfo(key string, data Data) {
// 同个key可能因为并发被注册多次
var nodes []model.Node
_ = c.Find(bson.M{"key": key}).All(&nodes)
if nodes != nil && len(nodes) > 1 {
if len(nodes) > 1 {
for _, node := range nodes {
_ = c.RemoveId(node.Id)
}
@@ -155,7 +155,11 @@ func UpdateNodeData() {
}
// 获取redis的key
key, err := register.GetRegister().GetKey()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 构造节点数据
data := Data{
Key: key,
@@ -238,19 +242,19 @@ func InitNodeService() error {
if model.IsMaster() {
// 如果为主节点,订阅主节点通信频道
if err := utils.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
return err
}
} else {
// 若为工作节点,订阅单独指定通信频道
channel := constants.ChannelWorkerNode + node.Id.Hex()
if err := utils.Sub(channel, WorkerNodeCallback); err != nil {
if err := database.Sub(channel, WorkerNodeCallback); err != nil {
return err
}
}
// 订阅全通道
if err := utils.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil {
if err := database.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil {
return err
}

View File

@@ -82,7 +82,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre
log.Infof("can't opened this file")
return err
}
defer f.Close()
defer utils.Close(f)
s := make([]byte, 4096)
for {
switch nr, err := f.Read(s[:]); true {
@@ -173,7 +173,7 @@ func RemoveSpider(id string) error {
Type: constants.MsgTypeRemoveSpider,
SpiderId: id,
}
if err := utils.Pub(constants.ChannelAllNode, msg); err != nil {
if err := database.Pub(constants.ChannelAllNode, msg); err != nil {
return err
}

View File

@@ -28,7 +28,7 @@ func (s *SpiderSync) CreateMd5File(md5 string) {
fileName := filepath.Join(path, Md5File)
file := utils.OpenFile(fileName)
defer file.Close()
defer utils.Close(file)
if file != nil {
if _, err := file.WriteString(md5 + "\n"); err != nil {
log.Errorf("file write string error: %s", err.Error())
@@ -80,7 +80,7 @@ func (s *SpiderSync) Download() {
defer session.Close()
f, err := gf.OpenId(bson.ObjectIdHex(fileId))
defer f.Close()
defer utils.Close(f)
if err != nil {
log.Errorf("open file id: " + fileId + ", spider id:" + spiderId + ", error: " + err.Error())
debug.PrintStack()
@@ -99,7 +99,7 @@ func (s *SpiderSync) Download() {
// 创建临时文件
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile := utils.OpenFile(tmpFilePath)
defer tmpFile.Close()
defer utils.Close(tmpFile)
// 将该文件写入临时文件
if _, err := io.Copy(tmpFile, f); err != nil {

View File

@@ -446,33 +446,29 @@ func ExecuteTask(id int) {
func GetTaskLog(id string) (logStr string, err error) {
task, err := model.GetTask(id)
if err != nil {
return "", err
return
}
logStr = ""
if IsMasterNode(task.NodeId.Hex()) {
// 若为主节点,获取本机日志
logBytes, err := model.GetLocalLog(task.LogPath)
logStr = utils.BytesToString(logBytes)
if err != nil {
log.Errorf(err.Error())
logStr = err.Error()
// return "", err
} else {
logStr = utils.BytesToString(logBytes)
}
} else {
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
if err != nil {
log.Errorf(err.Error())
return "", err
}
return logStr, err
}
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
if err != nil {
log.Errorf(err.Error())
return logStr, nil
}
return logStr, err
}
func CancelTask(id string) (err error) {

View File

@@ -17,9 +17,7 @@ func InitUserService() error {
Password: utils.EncryptPassword("admin"),
Role: constants.RoleAdmin,
}
if err := adminUser.Add(); err != nil {
// pass
}
_ = adminUser.Add()
return nil
}
func MakeToken(user *model.User) (tokenStr string, err error) {

View File

@@ -21,7 +21,7 @@ func RemoveFiles(path string) {
// 读取文件一行
func ReadFileOneLine(fileName string) string {
file := OpenFile(fileName)
defer file.Close()
defer Close(file)
buf := bufio.NewReader(file)
line, err := buf.ReadString('\n')
if err != nil {
@@ -57,10 +57,7 @@ func CreateFilePath(filePath string) {
func Exists(path string) bool {
_, err := os.Stat(path) //os.Stat获取文件信息
if err != nil {
if os.IsExist(err) {
return true
}
return false
return os.IsExist(err)
}
return true
}
@@ -88,7 +85,7 @@ func DeCompressByPath(tarFile, dest string) error {
if err != nil {
return err
}
defer srcFile.Close()
defer Close(srcFile)
return DeCompress(srcFile, dest)
}
@@ -112,7 +109,7 @@ func DeCompress(srcFile *os.File, dstPath string) error {
debug.PrintStack()
return err
}
defer zipFile.Close()
defer Close(zipFile)
// 遍历zip内所有文件和目录
for _, innerFile := range zipFile.File {
@@ -156,7 +153,7 @@ func DeCompress(srcFile *os.File, dstPath string) error {
debug.PrintStack()
continue
}
defer newFile.Close()
defer Close(newFile)
// 拷贝该文件到新文件中
if _, err := io.Copy(newFile, srcFile); err != nil {
@@ -184,9 +181,9 @@ func DeCompress(srcFile *os.File, dstPath string) error {
//dest 压缩文件存放地址
func Compress(files []*os.File, dest string) error {
d, _ := os.Create(dest)
defer d.Close()
defer Close(d)
w := zip.NewWriter(d)
defer w.Close()
defer Close(w)
for _, file := range files {
err := _Compress(file, "", w)
if err != nil {
@@ -234,7 +231,7 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error {
return err
}
_, err = io.Copy(writer, file)
file.Close()
Close(file)
if err != nil {
debug.PrintStack()
return err

View File

@@ -60,7 +60,7 @@ func TestCompress(t *testing.T) {
So(er, ShouldEqual, nil)
})
})
os.RemoveAll("testCompress")
_ = os.RemoveAll("testCompress")
}
func Zip(zipFile string, fileList []string) error {
@@ -69,16 +69,11 @@ func Zip(zipFile string, fileList []string) error {
if err != nil {
log.Fatal()
}
defer fw.Close()
defer Close(fw)
// 实例化新的 zip.Writer
zw := zip.NewWriter(fw)
defer func() {
// 检测一下是否成功关闭
if err := zw.Close(); err != nil {
log.Fatalln(err)
}
}()
defer Close(zw)
for _, fileName := range fileList {
fr, err := os.Open(fileName)
@@ -91,6 +86,9 @@ func Zip(zipFile string, fileList []string) error {
}
// 写入文件的头信息
fh, err := zip.FileInfoHeader(fi)
if err != nil {
return err
}
w, err := zw.CreateHeader(fh)
if err != nil {
return err
@@ -106,6 +104,10 @@ func Zip(zipFile string, fileList []string) error {
func TestDeCompress(t *testing.T) {
err := os.Mkdir("testDeCompress", os.ModePerm)
if err != nil {
t.Error(err)
}
err = Zip("demo.zip", []string{})
if err != nil {
t.Error("create zip file failed")
@@ -121,7 +123,7 @@ func TestDeCompress(t *testing.T) {
err := DeCompress(tmpFile, dstPath)
So(err, ShouldEqual, nil)
})
os.RemoveAll("testDeCompress")
os.Remove("demo.zip")
_ = os.RemoveAll("testDeCompress")
_ = os.Remove("demo.zip")
}

View File

@@ -1,12 +1,11 @@
package utils
import (
"context"
"crawlab/database"
"crawlab/entity"
"encoding/json"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
"io"
"runtime/debug"
"unsafe"
)
@@ -35,21 +34,9 @@ func GetMessage(message redis.Message) *entity.NodeMessage {
return &msg
}
func Pub(channel string, msg entity.NodeMessage) error {
if _, err := database.RedisClient.Publish(channel, GetJson(msg)); err != nil {
log.Errorf("publish redis error: %s", err.Error())
debug.PrintStack()
return err
func Close(c io.Closer) {
err := c.Close()
if err != nil {
log.WithError(err).Error("关闭资源文件失败。")
}
return nil
}
func Sub(channel string, consume database.ConsumeFunc) error {
ctx := context.Background()
if err := database.RedisClient.Subscribe(ctx, consume, channel); err != nil {
log.Errorf("subscribe redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}

View File

@@ -12,15 +12,16 @@ func IsObjectIdNull(id bson.ObjectId) bool {
}
func InterfaceToString(value interface{}) string {
switch value.(type) {
switch realValue := value.(type) {
case bson.ObjectId:
return value.(bson.ObjectId).Hex()
return realValue.Hex()
case string:
return value.(string)
return realValue
case int:
return strconv.Itoa(value.(int))
return strconv.Itoa(realValue)
case time.Time:
return value.(time.Time).String()
return realValue.String()
default:
return ""
}
return ""
}

View File

@@ -8,7 +8,7 @@ import (
func EncryptPassword(str string) string {
w := md5.New()
io.WriteString(w, str)
_, _ = io.WriteString(w, str)
md5str := fmt.Sprintf("%x", w.Sum(nil))
return md5str
}

View File

@@ -6,7 +6,7 @@ then
:
else
jspath=`ls /app/dist/js/app.*.js`
sed -i "s?localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath}
sed -i "s?http://localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath}
fi
# replace base url