Merge branch 'develop' of https://github.com/crawlab-team/crawlab into develop

This commit is contained in:
marvzhang
2019-11-01 13:11:18 +08:00
68 changed files with 1564 additions and 1073 deletions

View File

@@ -1,3 +1,19 @@
# 0.3.5 (2019-10-28)
### Features / Enhancement
- **Graceful Showdown**. [detail](https://github.com/crawlab-team/crawlab/commit/63fab3917b5a29fd9770f9f51f1572b9f0420385)
- **Node Info Optimization**. [detail](https://github.com/crawlab-team/crawlab/commit/973251a0fbe7a2184ac0da09e0404a17c736aee7)
- **Append System Environment Variables to Tasks**. [detail](https://github.com/crawlab-team/crawlab/commit/4ab4892471965d6342d30385578ca60dc51f8ad3)
- **Auto Refresh Task Log**. [detail](https://github.com/crawlab-team/crawlab/commit/4ab4892471965d6342d30385578ca60dc51f8ad3)
- **Enable HTTPS Deployment**. [detail](https://github.com/crawlab-team/crawlab/commit/5d8f6f0c56768a6e58f5e46cbf5adff8c7819228)
### Bug Fixes
- **Unable to fetch spider list info in schedule jobs**. [detail](https://github.com/crawlab-team/crawlab/commit/311f72da19094e3fa05ab4af49812f58843d8d93)
- **Unable to fetch node info from worker nodes**. [detail](https://github.com/crawlab-team/crawlab/commit/6af06efc17685a9e232e8c2b5fd819ec7d2d1674)
- **Unable to select node when trying to run spider tasks**. [detail](https://github.com/crawlab-team/crawlab/commit/31f8e03234426e97aed9b0bce6a50562f957edad)
- **Unable to fetch result count when result volume is large**. [#260](https://github.com/crawlab-team/crawlab/issues/260)
- **Node issue in schedule tasks**. [#244](https://github.com/crawlab-team/crawlab/issues/244)
# 0.3.1 (2019-08-25)
### Features / Enhancement
- **Docker Image Optimization**. Split docker further into master, worker, frontend with alpine image.

View File

@@ -14,7 +14,8 @@ ADD ./frontend /app
WORKDIR /app
# install frontend
RUN npm install -g yarn && yarn install
RUN npm config set unsafe-perm true
RUN npm install -g yarn && yarn install --registry=https://registry.npm.taobao.org
RUN npm run build:prod
@@ -56,4 +57,4 @@ EXPOSE 8080
EXPOSE 8000
# start backend
CMD ["/bin/sh", "/app/docker_init.sh"]
CMD ["/bin/sh", "/app/docker_init.sh"]

View File

@@ -21,6 +21,7 @@
三种方式:
1. [Docker](https://tikazyq.github.io/crawlab-docs/Installation/Docker.html)(推荐)
2. [直接部署](https://tikazyq.github.io/crawlab-docs/Installation/Direct.html)(了解内核)
3. [Kubernetes](https://mp.weixin.qq.com/s/3Q1BQATUIEE_WXcHPqhYbA)
### 要求Docker
- Docker 18.03+
@@ -46,7 +47,7 @@ services:
image: tikazyq/crawlab:latest
container_name: master
environment:
CRAWLAB_API_ADDRESS: "localhost:8000"
CRAWLAB_API_ADDRESS: "http://localhost:8000"
CRAWLAB_SERVER_MASTER: "Y"
CRAWLAB_MONGO_HOST: "mongo"
CRAWLAB_REDIS_ADDRESS: "redis"

View File

@@ -21,6 +21,7 @@ Golang-based distributed web crawler management platform, supporting various lan
Two methods:
1. [Docker](https://tikazyq.github.io/crawlab-docs/Installation/Docker.html) (Recommended)
2. [Direct Deploy](https://tikazyq.github.io/crawlab-docs/Installation/Direct.html) (Check Internal Kernel)
3. [Kubernetes](https://mp.weixin.qq.com/s/3Q1BQATUIEE_WXcHPqhYbA)
### Pre-requisite (Docker)
- Docker 18.03+
@@ -47,7 +48,7 @@ services:
image: tikazyq/crawlab:latest
container_name: master
environment:
CRAWLAB_API_ADDRESS: "localhost:8000"
CRAWLAB_API_ADDRESS: "http://localhost:8000"
CRAWLAB_SERVER_MASTER: "Y"
CRAWLAB_MONGO_HOST: "mongo"
CRAWLAB_REDIS_ADDRESS: "redis"

View File

@@ -0,0 +1,9 @@
package constants
const (
ChannelAllNode = "nodes:public"
ChannelWorkerNode = "nodes:"
ChannelMasterNode = "nodes:master"
)

View File

@@ -5,4 +5,5 @@ const (
MsgTypeGetSystemInfo = "get-sys-info"
MsgTypeCancelTask = "cancel-task"
MsgTypeRemoveLog = "remove-log"
MsgTypeRemoveSpider = "remove-spider"
)

View File

@@ -1,11 +1,18 @@
package constants
const (
StatusPending string = "pending"
StatusRunning string = "running"
StatusFinished string = "finished"
StatusError string = "error"
// 调度中
StatusPending string = "pending"
// 运行中
StatusRunning string = "running"
// 已完成
StatusFinished string = "finished"
// 错误
StatusError string = "error"
// 取消
StatusCancelled string = "cancelled"
// 节点重启导致的异常终止
StatusAbnormal string = "abnormal"
)
const (

View File

@@ -3,6 +3,7 @@ package database
import (
"github.com/globalsign/mgo"
"github.com/spf13/viper"
"net"
"time"
)
@@ -39,13 +40,28 @@ func InitMongo() error {
var mongoAuth = viper.GetString("mongo.authSource")
if Session == nil {
var uri string
if mongoUsername == "" {
uri = "mongodb://" + mongoHost + ":" + mongoPort + "/" + mongoDb
} else {
uri = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoHost + ":" + mongoPort + "/" + mongoDb + "?authSource=" + mongoAuth
var dialInfo mgo.DialInfo
addr := net.JoinHostPort(mongoHost, mongoPort)
timeout := time.Second * 10
dialInfo = mgo.DialInfo{
Addrs: []string{addr},
Timeout: timeout,
Database: mongoDb,
PoolLimit: 100,
PoolTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
AppName: "crawlab",
FailFast: true,
MinPoolSize: 10,
MaxIdleTimeMS: 1000 * 30,
}
sess, err := mgo.DialWithTimeout(uri, time.Second*5)
if mongoUsername != "" {
dialInfo.Username = mongoUsername
dialInfo.Password = mongoPassword
dialInfo.Source = mongoAuth
}
sess, err := mgo.DialWithInfo(&dialInfo)
if err != nil {
return err
}

View File

@@ -2,6 +2,7 @@ package database
import (
"context"
"crawlab/utils"
"fmt"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
@@ -26,14 +27,13 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
tick := time.NewTicker(time.Second * 3)
defer tick.Stop()
go func() {
defer func() { _ = psc.Close() }()
defer utils.Close(psc)
for {
switch msg := psc.Receive().(type) {
case error:
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
return
case redis.Message:
fmt.Println(msg)
if err := consume(msg); err != nil {
fmt.Printf("redis pubsub consume message err: %v", err)
continue
@@ -88,7 +88,7 @@ func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
}
func (r *Redis) Publish(channel, message string) (n int, err error) {
conn := r.pool.Get()
defer func() { _ = conn.Close() }()
defer utils.Close(conn)
n, err = redis.Int(conn.Do("PUBLISH", channel, message))
if err != nil {
return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message)

View File

@@ -1,6 +1,10 @@
package database
import (
"context"
"crawlab/entity"
"crawlab/utils"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
"runtime/debug"
@@ -18,7 +22,7 @@ func NewRedisClient() *Redis {
}
func (r *Redis) RPush(collection string, value interface{}) error {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
if _, err := c.Do("RPUSH", collection, value); err != nil {
debug.PrintStack()
@@ -29,7 +33,7 @@ func (r *Redis) RPush(collection string, value interface{}) error {
func (r *Redis) LPop(collection string) (string, error) {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
value, err2 := redis.String(c.Do("LPOP", collection))
if err2 != nil {
@@ -40,7 +44,7 @@ func (r *Redis) LPop(collection string) (string, error) {
func (r *Redis) HSet(collection string, key string, value string) error {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
if _, err := c.Do("HSET", collection, key, value); err != nil {
debug.PrintStack()
@@ -51,7 +55,7 @@ func (r *Redis) HSet(collection string, key string, value string) error {
func (r *Redis) HGet(collection string, key string) (string, error) {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
value, err2 := redis.String(c.Do("HGET", collection, key))
if err2 != nil {
@@ -62,7 +66,7 @@ func (r *Redis) HGet(collection string, key string) (string, error) {
func (r *Redis) HDel(collection string, key string) error {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
if _, err := c.Do("HDEL", collection, key); err != nil {
return err
@@ -72,7 +76,7 @@ func (r *Redis) HDel(collection string, key string) error {
func (r *Redis) HKeys(collection string) ([]string, error) {
c := r.pool.Get()
defer c.Close()
defer utils.Close(c)
value, err2 := redis.Strings(c.Do("HKeys", collection))
if err2 != nil {
@@ -120,3 +124,22 @@ func InitRedis() error {
RedisClient = NewRedisClient()
return nil
}
func Pub(channel string, msg entity.NodeMessage) error {
if _, err := RedisClient.Publish(channel, utils.GetJson(msg)); err != nil {
log.Errorf("publish redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func Sub(channel string, consume ConsumeFunc) error {
ctx := context.Background()
if err := RedisClient.Subscribe(ctx, consume, channel); err != nil {
log.Errorf("subscribe redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}

17
backend/entity/common.go Normal file
View File

@@ -0,0 +1,17 @@
package entity
import "strconv"
type Page struct {
Skip int
Limit int
PageNum int
PageSize int
}
func (p *Page)GetPage(pageNum string, pageSize string) {
p.PageNum, _ = strconv.Atoi(pageNum)
p.PageSize, _ = strconv.Atoi(pageSize)
p.Skip = p.PageSize * (p.PageNum - 1)
p.Limit = p.PageSize
}

25
backend/entity/node.go Normal file
View File

@@ -0,0 +1,25 @@
package entity
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 节点相关
NodeId string `json:"node_id"` // 节点ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 系统信息
SysInfo SystemInfo `json:"sys_info"`
// 爬虫相关
SpiderId string `json:"spider_id"` //爬虫ID
// 错误相关
Error string `json:"error"`
}

6
backend/entity/spider.go Normal file
View File

@@ -0,0 +1,6 @@
package entity
type SpiderType struct {
Type string `json:"type" bson:"_id"`
Count int `json:"count" bson:"count"`
}

15
backend/entity/system.go Normal file
View File

@@ -0,0 +1,15 @@
package entity
type SystemInfo struct {
ARCH string `json:"arch"`
OS string `json:"os"`
Hostname string `json:"host_name"`
NumCpu int `json:"num_cpu"`
Executables []Executable `json:"executables"`
}
type Executable struct {
Path string `json:"path"`
FileName string `json:"file_name"`
DisplayName string `json:"display_name"`
}

View File

@@ -24,7 +24,6 @@ func (O OPError) Error() string {
switch O.Scope {
case ScopeSystem:
scope = "system"
break
case ScopeBusiness:
scope = "business"
}

View File

@@ -44,17 +44,14 @@ func TestFuncPanicRecovery(t *testing.T) {
WithChain(Recover(newBufLogger(&buf))))
cron.Start()
defer cron.Stop()
cron.AddFunc("* * * * * ?", func() {
_, _ = cron.AddFunc("* * * * * ?", func() {
panic("YOLO")
})
select {
case <-time.After(OneSecond):
if !strings.Contains(buf.String(), "YOLO") {
t.Error("expected a panic to be logged, got none")
}
return
<-time.After(OneSecond)
if !strings.Contains(buf.String(), "YOLO") {
t.Error("expected a panic to be logged, got none")
}
}
type DummyJob struct{}
@@ -71,7 +68,7 @@ func TestJobPanicRecovery(t *testing.T) {
WithChain(Recover(newBufLogger(&buf))))
cron.Start()
defer cron.Stop()
cron.AddJob("* * * * * ?", job)
_, _ = cron.AddJob("* * * * * ?", job)
select {
case <-time.After(OneSecond):
@@ -102,7 +99,7 @@ func TestStopCausesJobsToNotRun(t *testing.T) {
cron := newWithSeconds()
cron.Start()
cron.Stop()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
select {
case <-time.After(OneSecond):
@@ -118,7 +115,7 @@ func TestAddBeforeRunning(t *testing.T) {
wg.Add(1)
cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -138,7 +135,7 @@ func TestAddWhileRunning(t *testing.T) {
cron := newWithSeconds()
cron.Start()
defer cron.Stop()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
select {
case <-time.After(OneSecond):
@@ -154,7 +151,7 @@ func TestAddWhileRunningWithDelay(t *testing.T) {
defer cron.Stop()
time.Sleep(5 * time.Second)
var calls int64
cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
_, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
<-time.After(OneSecond)
if atomic.LoadInt64(&calls) != 1 {
@@ -205,7 +202,7 @@ func TestSnapshotEntries(t *testing.T) {
wg.Add(1)
cron := New()
cron.AddFunc("@every 2s", func() { wg.Done() })
_, _ = cron.AddFunc("@every 2s", func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -232,12 +229,12 @@ func TestMultipleEntries(t *testing.T) {
wg.Add(2)
cron := newWithSeconds()
cron.AddFunc("0 0 0 1 1 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
id1, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
id2, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
cron.AddFunc("0 0 0 31 12 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Remove(id1)
cron.Start()
@@ -257,9 +254,9 @@ func TestRunningJobTwice(t *testing.T) {
wg.Add(2)
cron := newWithSeconds()
cron.AddFunc("0 0 0 1 1 ?", func() {})
cron.AddFunc("0 0 0 31 12 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -276,9 +273,9 @@ func TestRunningMultipleSchedules(t *testing.T) {
wg.Add(2)
cron := newWithSeconds()
cron.AddFunc("0 0 0 1 1 ?", func() {})
cron.AddFunc("0 0 0 31 12 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Schedule(Every(time.Minute), FuncJob(func() {}))
cron.Schedule(Every(time.Second), FuncJob(func() { wg.Done() }))
cron.Schedule(Every(time.Hour), FuncJob(func() {}))
@@ -310,7 +307,7 @@ func TestLocalTimezone(t *testing.T) {
now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month())
cron := newWithSeconds()
cron.AddFunc(spec, func() { wg.Done() })
_, _ = cron.AddFunc(spec, func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -344,7 +341,7 @@ func TestNonLocalTimezone(t *testing.T) {
now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month())
cron := New(WithLocation(loc), WithParser(secondParser))
cron.AddFunc(spec, func() { wg.Done() })
_, _ = cron.AddFunc(spec, func() { wg.Done() })
cron.Start()
defer cron.Stop()
@@ -386,7 +383,7 @@ func TestBlockingRun(t *testing.T) {
wg.Add(1)
cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() { wg.Done() })
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
var unblockChan = make(chan struct{})
@@ -410,7 +407,7 @@ func TestStartNoop(t *testing.T) {
var tickChan = make(chan struct{}, 2)
cron := newWithSeconds()
cron.AddFunc("* * * * * ?", func() {
_, _ = cron.AddFunc("* * * * * ?", func() {
tickChan <- struct{}{}
})
@@ -438,10 +435,10 @@ func TestJob(t *testing.T) {
wg.Add(1)
cron := newWithSeconds()
cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"})
cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"})
_, _ = cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"})
_, _ = cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"})
job2, _ := cron.AddJob("* * * * * ?", testJob{wg, "job2"})
cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"})
_, _ = cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"})
cron.Schedule(Every(5*time.Second+5*time.Nanosecond), testJob{wg, "job4"})
job5 := cron.Schedule(Every(5*time.Minute), testJob{wg, "job5"})
@@ -465,7 +462,7 @@ func TestJob(t *testing.T) {
// Ensure the entries are in the right order.
expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"}
var actuals []string
var actuals = make([]string, 0, len(cron.Entries()))
for _, entry := range cron.Entries() {
actuals = append(actuals, entry.Job.(testJob).name)
}
@@ -545,7 +542,7 @@ func (*ZeroSchedule) Next(time.Time) time.Time {
func TestJobWithZeroTimeDoesNotRun(t *testing.T) {
cron := newWithSeconds()
var calls int64
cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
_, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
cron.Schedule(new(ZeroSchedule), FuncJob(func() { t.Error("expected zero task will not run") }))
cron.Start()
defer cron.Stop()
@@ -582,11 +579,11 @@ func TestStopAndWait(t *testing.T) {
t.Run("a couple fast jobs added, still returns immediately", func(t *testing.T) {
cron := newWithSeconds()
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
cron.Start()
cron.AddFunc("* * * * * *", func() {})
cron.AddFunc("* * * * * *", func() {})
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
time.Sleep(time.Second)
ctx := cron.Stop()
select {
@@ -598,10 +595,10 @@ func TestStopAndWait(t *testing.T) {
t.Run("a couple fast jobs and a slow job added, waits for slow job", func(t *testing.T) {
cron := newWithSeconds()
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
cron.Start()
cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
_, _ = cron.AddFunc("* * * * * *", func() {})
time.Sleep(time.Second)
ctx := cron.Stop()
@@ -625,10 +622,10 @@ func TestStopAndWait(t *testing.T) {
t.Run("repeated calls to stop, waiting for completion and after", func(t *testing.T) {
cron := newWithSeconds()
cron.AddFunc("* * * * * *", func() {})
cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
_, _ = cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
cron.Start()
cron.AddFunc("* * * * * *", func() {})
_, _ = cron.AddFunc("* * * * * *", func() {})
time.Sleep(time.Second)
ctx := cron.Stop()
ctx2 := cron.Stop()

View File

@@ -9,10 +9,10 @@ import (
)
// DefaultLogger is used by Cron if none is specified.
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
var DefaultLogger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
// DiscardLogger can be used by callers to discard all log messages.
var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0))
var DiscardLogger = PrintfLogger(log.New(ioutil.Discard, "", 0))
// Logger is the interface used in this package for logging, so that any backend
// can be plugged in. It is a subset of the github.com/go-logr/logr interface.

View File

@@ -30,7 +30,7 @@ func TestWithVerboseLogger(t *testing.T) {
t.Error("expected provided logger")
}
c.AddFunc("@every 1s", func() {})
_, _ = c.AddFunc("@every 1s", func() {})
c.Start()
time.Sleep(OneSecond)
c.Stop()

View File

@@ -304,6 +304,7 @@ func TestNormalizeFields_Errors(t *testing.T) {
actual, err := normalizeFields(test.input, test.options)
if err == nil {
t.Errorf("expected an error, got none. results: %v", actual)
return
}
if !strings.Contains(err.Error(), test.err) {
t.Errorf("expected error %q, got %q", test.err, err.Error())

View File

@@ -178,8 +178,8 @@ WRAP:
// restrictions are satisfied by the given time.
func dayMatches(s *SpecSchedule, t time.Time) bool {
var (
domMatch bool = 1<<uint(t.Day())&s.Dom > 0
dowMatch bool = 1<<uint(t.Weekday())&s.Dow > 0
domMatch = 1<<uint(t.Day())&s.Dom > 0
dowMatch = 1<<uint(t.Weekday())&s.Dow > 0
)
if s.Dom&starBit > 0 || s.Dow&starBit > 0 {
return domMatch && dowMatch

View File

@@ -1,17 +1,25 @@
package main
import (
"context"
"crawlab/config"
"crawlab/database"
"crawlab/lib/validate_bridge"
"crawlab/middlewares"
"crawlab/model"
"crawlab/routes"
"crawlab/services"
"github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/spf13/viper"
"net"
"net/http"
"os"
"os/signal"
"runtime/debug"
"syscall"
"time"
)
func main() {
@@ -57,7 +65,7 @@ func main() {
}
log.Info("初始化Redis数据库成功")
if services.IsMaster() {
if model.IsMaster() {
// 初始化定时任务
if err := services.InitScheduler(); err != nil {
log.Error("init scheduler error:" + err.Error())
@@ -99,7 +107,7 @@ func main() {
log.Info("初始化用户服务成功")
// 以下为主节点服务
if services.IsMaster() {
if model.IsMaster() {
// 中间件
app.Use(middlewares.CORSMiddleware())
//app.Use(middlewares.AuthorizationMiddleware())
@@ -131,6 +139,7 @@ func main() {
authGroup.POST("/spiders/:id/file", routes.PostSpiderFile) // 爬虫目录写入
authGroup.GET("/spiders/:id/dir", routes.GetSpiderDir) // 爬虫目录
authGroup.GET("/spiders/:id/stats", routes.GetSpiderStats) // 爬虫统计数据
authGroup.GET("/spider/types", routes.GetSpiderTypes) // 爬虫类型
// 任务
authGroup.GET("/tasks", routes.GetTaskList) // 任务列表
authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情
@@ -164,8 +173,26 @@ func main() {
// 运行服务器
host := viper.GetString("server.host")
port := viper.GetString("server.port")
if err := app.Run(host + ":" + port); err != nil {
address := net.JoinHostPort(host, port)
srv := &http.Server{
Handler: app,
Addr: address,
}
go func() {
if err := srv.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
log.Error("run server error:" + err.Error())
} else {
log.Info("server graceful down")
}
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := srv.Shutdown(ctx2); err != nil {
log.Error("run server error:" + err.Error())
panic(err)
}
}

View File

@@ -1,6 +1,7 @@
package mock
import (
"crawlab/entity"
"crawlab/model"
"crawlab/services"
"github.com/apex/log"
@@ -97,14 +98,14 @@ var dataList = []services.Data{
},
}
var executeble = []model.Executable{
var executeble = []entity.Executable{
{
Path: "/test",
FileName: "test.py",
DisplayName: "test.py",
},
}
var systemInfo = model.SystemInfo{ARCH: "x86",
var systemInfo = entity.SystemInfo{ARCH: "x86",
OS: "linux",
Hostname: "test",
NumCpu: 4,
@@ -187,7 +188,7 @@ func DeleteNode(c *gin.Context) {
id := bson.ObjectId("5d429e6c19f7abede924fee2")
for _, node := range NodeList {
if node.Id == bson.ObjectId(id) {
if node.Id == id {
log.Infof("Delete a node")
}
}

View File

@@ -1,11 +1,24 @@
package model
import (
"crawlab/database"
"crawlab/utils"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"os"
"runtime/debug"
"time"
)
type GridFs struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
ChunkSize int32 `json:"chunk_size" bson:"chunkSize"`
UploadDate time.Time `json:"upload_date" bson:"uploadDate"`
Length int32 `json:"length" bson:"length"`
Md5 string `json:"md_5" bson:"md5"`
Filename string `json:"filename" bson:"filename"`
}
type File struct {
Name string `json:"name"`
Path string `json:"path"`
@@ -13,12 +26,49 @@ type File struct {
Size int64 `json:"size"`
}
func (f *GridFs) Remove() {
s, gf := database.GetGridFs("files")
defer s.Close()
if err := gf.RemoveId(f.Id); err != nil {
log.Errorf("remove file id error: %s, id: %s", err.Error(), f.Id.Hex())
debug.PrintStack()
}
}
func GetAllGridFs() []*GridFs {
s, gf := database.GetGridFs("files")
defer s.Close()
var files []*GridFs
if err := gf.Find(nil).All(&files); err != nil {
log.Errorf("get all files error: {}", err.Error())
debug.PrintStack()
return nil
}
return files
}
func GetGridFs(id bson.ObjectId) *GridFs {
s, gf := database.GetGridFs("files")
defer s.Close()
var gfFile GridFs
err := gf.Find(bson.M{"_id": id}).One(&gfFile)
if err != nil {
log.Errorf("get gf file error: %s, file_id: %s", err.Error(), id.Hex())
debug.PrintStack()
return nil
}
return &gfFile
}
func RemoveFile(path string) error {
if !utils.Exists(path) {
log.Info("file not found: " + path)
debug.PrintStack()
return nil
}
if err := os.Remove(path); err != nil {
if err := os.RemoveAll(path); err != nil {
return err
}
return nil

View File

@@ -1,6 +1,7 @@
package model
import (
"crawlab/utils"
"github.com/apex/log"
"os"
"runtime/debug"
@@ -21,7 +22,7 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) {
debug.PrintStack()
return nil, err
}
defer f.Close()
defer utils.Close(f)
const bufLen = 2 * 1024 * 1024
logBuf := make([]byte, bufLen)

View File

@@ -4,9 +4,11 @@ import (
"crawlab/constants"
"crawlab/database"
"crawlab/services/register"
"errors"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"runtime/debug"
"time"
)
@@ -30,6 +32,72 @@ type Node struct {
UpdateTsUnix int64 `json:"update_ts_unix" bson:"update_ts_unix"`
}
const (
Yes = "Y"
)
// 当前节点是否为主节点
func IsMaster() bool {
return viper.GetString("server.master") == Yes
}
// 获取本机节点
func GetCurrentNode() (Node, error) {
// 获得注册的key值
key, err := register.GetRegister().GetKey()
if err != nil {
return Node{}, err
}
// 从数据库中获取当前节点
var node Node
errNum := 0
for {
// 如果错误次数超过10次
if errNum >= 10 {
panic("cannot get current node")
}
// 尝试获取节点
node, err = GetNodeByKey(key)
// 如果获取失败
if err != nil {
// 如果为主节点,表示为第一次注册,插入节点信息
if IsMaster() {
// 获取本机信息
ip, mac, key, err := GetNodeBaseInfo()
if err != nil {
debug.PrintStack()
return node, err
}
// 生成节点
node = Node{
Key: key,
Id: bson.NewObjectId(),
Ip: ip,
Name: ip,
Mac: mac,
IsMaster: true,
}
if err := node.Add(); err != nil {
return node, err
}
return node, nil
}
// 增加错误次数
errNum++
// 5秒后重试
time.Sleep(5 * time.Second)
continue
}
// 跳出循环
break
}
return node, nil
}
func (n *Node) Save() error {
s, c := database.GetCol("nodes")
defer s.Close()
@@ -89,15 +157,20 @@ func GetNodeList(filter interface{}) ([]Node, error) {
}
func GetNode(id bson.ObjectId) (Node, error) {
var node Node
if id.Hex() == "" {
log.Infof("id is empty")
debug.PrintStack()
return node, errors.New("id is empty")
}
s, c := database.GetCol("nodes")
defer s.Close()
var node Node
if err := c.FindId(id).One(&node); err != nil {
if err != mgo.ErrNotFound {
log.Errorf(err.Error())
debug.PrintStack()
}
log.Errorf(err.Error())
debug.PrintStack()
return node, err
}
return node, nil

View File

@@ -5,6 +5,7 @@ import (
"crawlab/database"
"crawlab/lib/cron"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"runtime/debug"
"time"
@@ -16,6 +17,7 @@ type Schedule struct {
Description string `json:"description" bson:"description"`
SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"`
NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
NodeKey string `json:"node_key" bson:"node_key"`
Cron string `json:"cron" bson:"cron"`
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
Param string `json:"param" bson:"param"`
@@ -38,6 +40,33 @@ func (sch *Schedule) Save() error {
return nil
}
func (sch *Schedule) Delete() error {
s, c := database.GetCol("schedules")
defer s.Close()
return c.RemoveId(sch.Id)
}
func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) {
sch.syncNodeId(node)
sch.syncSpiderId(spider)
}
func (sch *Schedule) syncNodeId(node Node) {
if node.Id.Hex() == sch.NodeId.Hex() {
return
}
sch.NodeId = node.Id
_ = sch.Save()
}
func (sch *Schedule) syncSpiderId(spider Spider) {
if spider.Id.Hex() == sch.SpiderId.Hex() {
return
}
sch.SpiderId = spider.Id
_ = sch.Save()
}
func GetScheduleList(filter interface{}) ([]Schedule, error) {
s, c := database.GetCol("schedules")
defer s.Close()
@@ -47,11 +76,12 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
return schedules, err
}
for i, schedule := range schedules {
var schs []Schedule
for _, schedule := range schedules {
// 获取节点名称
if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) {
// 选择所有节点
schedules[i].NodeName = "All Nodes"
schedule.NodeName = "All Nodes"
} else {
// 选择单一节点
node, err := GetNode(schedule.NodeId)
@@ -59,18 +89,21 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
log.Errorf(err.Error())
continue
}
schedules[i].NodeName = node.Name
schedule.NodeName = node.Name
}
// 获取爬虫名称
spider, err := GetSpider(schedule.SpiderId)
if err != nil {
log.Errorf(err.Error())
if err != nil && err == mgo.ErrNotFound {
log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error())
debug.PrintStack()
_ = schedule.Delete()
continue
}
schedules[i].SpiderName = spider.Name
schedule.SpiderName = spider.Name
schs = append(schs, schedule)
}
return schedules, nil
return schs, nil
}
func GetSchedule(id bson.ObjectId) (Schedule, error) {
@@ -92,7 +125,12 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error {
if err := c.FindId(id).One(&result); err != nil {
return err
}
node, err := GetNode(item.NodeId)
if err != nil {
return err
}
item.NodeKey = node.Key
if err := item.Save(); err != nil {
return err
}
@@ -103,9 +141,15 @@ func AddSchedule(item Schedule) error {
s, c := database.GetCol("schedules")
defer s.Close()
node, err := GetNode(item.NodeId)
if err != nil {
return err
}
item.Id = bson.NewObjectId()
item.CreateTs = time.Now()
item.UpdateTs = time.Now()
item.NodeKey = node.Key
if err := c.Insert(&item); err != nil {
debug.PrintStack()

View File

@@ -2,6 +2,7 @@ package model
import (
"crawlab/database"
"crawlab/entity"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
@@ -18,12 +19,12 @@ type Spider struct {
Id bson.ObjectId `json:"_id" bson:"_id"` // 爬虫ID
Name string `json:"name" bson:"name"` // 爬虫名称(唯一)
DisplayName string `json:"display_name" bson:"display_name"` // 爬虫显示名称
Type string `json:"type"` // 爬虫类别
Type string `json:"type" bson:"type"` // 爬虫类别
FileId bson.ObjectId `json:"file_id" bson:"file_id"` // GridFS文件ID
Col string `json:"col"` // 结果储存位置
Site string `json:"site"` // 爬虫网站
Col string `json:"col" bson:"col"` // 结果储存位置
Site string `json:"site" bson:"site"` // 爬虫网站
Envs []Env `json:"envs" bson:"envs"` // 环境变量
Remark string `json:"remark"` // 备注
Remark string `json:"remark" bson:"remark"` // 备注
// 自定义爬虫
Src string `json:"src" bson:"src"` // 源码位置
Cmd string `json:"cmd" bson:"cmd"` // 执行命令
@@ -47,6 +48,7 @@ type Spider struct {
UpdateTs time.Time `json:"update_ts" bson:"update_ts"`
}
// 更新爬虫
func (spider *Spider) Save() error {
s, c := database.GetCol("spiders")
defer s.Close()
@@ -60,6 +62,7 @@ func (spider *Spider) Save() error {
return nil
}
// 新增爬虫
func (spider *Spider) Add() error {
s, c := database.GetCol("spiders")
defer s.Close()
@@ -74,6 +77,7 @@ func (spider *Spider) Add() error {
return nil
}
// 获取爬虫的任务
func (spider *Spider) GetTasks() ([]Task, error) {
tasks, err := GetTaskList(bson.M{"spider_id": spider.Id}, 0, 10, "-create_ts")
if err != nil {
@@ -82,6 +86,7 @@ func (spider *Spider) GetTasks() ([]Task, error) {
return tasks, nil
}
// 爬虫最新的任务
func (spider *Spider) GetLastTask() (Task, error) {
tasks, err := GetTaskList(bson.M{"spider_id": spider.Id}, 0, 1, "-create_ts")
if err != nil {
@@ -93,15 +98,22 @@ func (spider *Spider) GetLastTask() (Task, error) {
return tasks[0], nil
}
func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) {
func (spider *Spider) Delete() error {
s, c := database.GetCol("spiders")
defer s.Close()
return c.RemoveId(spider.Id)
}
// 爬虫列表
func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, int, error) {
s, c := database.GetCol("spiders")
defer s.Close()
// 获取爬虫列表
spiders := []Spider{}
var spiders []Spider
if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil {
debug.PrintStack()
return spiders, err
return spiders, 0, err
}
// 遍历爬虫列表
@@ -119,9 +131,40 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) {
spiders[i].LastStatus = task.Status
}
return spiders, nil
count, _ := c.Find(filter).Count()
return spiders, count, nil
}
// 获取爬虫
func GetSpiderByFileId(fileId bson.ObjectId) *Spider {
s, c := database.GetCol("spiders")
defer s.Close()
var result *Spider
if err := c.Find(bson.M{"file_id": fileId}).One(&result); err != nil {
log.Errorf("get spider error: %s, file_id: %s", err.Error(), fileId.Hex())
debug.PrintStack()
return nil
}
return result
}
// 获取爬虫
func GetSpiderByName(name string) *Spider {
s, c := database.GetCol("spiders")
defer s.Close()
var result *Spider
if err := c.Find(bson.M{"name": name}).One(&result); err != nil {
log.Errorf("get spider error: %s, spider_name: %s", err.Error(), name)
debug.PrintStack()
return nil
}
return result
}
// 获取爬虫
func GetSpider(id bson.ObjectId) (Spider, error) {
s, c := database.GetCol("spiders")
defer s.Close()
@@ -129,6 +172,7 @@ func GetSpider(id bson.ObjectId) (Spider, error) {
var result Spider
if err := c.FindId(id).One(&result); err != nil {
if err != mgo.ErrNotFound {
log.Errorf("get spider error: %s, id: %id", err.Error(), id.Hex())
debug.PrintStack()
}
return result, err
@@ -136,6 +180,7 @@ func GetSpider(id bson.ObjectId) (Spider, error) {
return result, nil
}
// 更新爬虫
func UpdateSpider(id bson.ObjectId, item Spider) error {
s, c := database.GetCol("spiders")
defer s.Close()
@@ -152,6 +197,7 @@ func UpdateSpider(id bson.ObjectId, item Spider) error {
return nil
}
// 删除爬虫
func RemoveSpider(id bson.ObjectId) error {
s, c := database.GetCol("spiders")
defer s.Close()
@@ -162,6 +208,8 @@ func RemoveSpider(id bson.ObjectId) error {
}
if err := c.RemoveId(id); err != nil {
log.Errorf("remove spider error: %s, id:%s", err.Error(), id.Hex())
debug.PrintStack()
return err
}
@@ -171,17 +219,19 @@ func RemoveSpider(id bson.ObjectId) error {
if err := gf.RemoveId(result.FileId); err != nil {
log.Error("remove file error, id:" + result.FileId.Hex())
debug.PrintStack()
return err
}
return nil
}
// 删除所有爬虫
func RemoveAllSpider() error {
s, c := database.GetCol("spiders")
defer s.Close()
spiders := []Spider{}
var spiders []Spider
err := c.Find(nil).All(&spiders)
if err != nil {
log.Error("get all spiders error:" + err.Error())
@@ -195,6 +245,7 @@ func RemoveAllSpider() error {
return nil
}
// 爬虫总数
func GetSpiderCount() (int, error) {
s, c := database.GetCol("spiders")
defer s.Close()
@@ -203,6 +254,26 @@ func GetSpiderCount() (int, error) {
if err != nil {
return 0, err
}
return count, nil
}
// 爬虫类型
func GetSpiderTypes() ([]*entity.SpiderType, error) {
s, c := database.GetCol("spiders")
defer s.Close()
group := bson.M{
"$group": bson.M{
"_id": "$type",
"count": bson.M{"$sum": 1},
},
}
var types []*entity.SpiderType
if err := c.Pipe([]bson.M{group}).All(&types); err != nil {
log.Errorf("get spider types error: %s", err.Error())
debug.PrintStack()
return nil, err
}
return types, nil
}

View File

@@ -1,6 +1,7 @@
package model
import (
"crawlab/entity"
"github.com/apex/log"
"io/ioutil"
"os"
@@ -35,21 +36,7 @@ var executableNameMap = map[string]string{
"bash": "bash",
}
type SystemInfo struct {
ARCH string `json:"arch"`
OS string `json:"os"`
Hostname string `json:"host_name"`
NumCpu int `json:"num_cpu"`
Executables []Executable `json:"executables"`
}
type Executable struct {
Path string `json:"path"`
FileName string `json:"file_name"`
DisplayName string `json:"display_name"`
}
func GetLocalSystemInfo() (sysInfo SystemInfo, err error) {
func GetLocalSystemInfo() (sysInfo entity.SystemInfo, err error) {
executables, err := GetExecutables()
if err != nil {
return sysInfo, err
@@ -60,7 +47,7 @@ func GetLocalSystemInfo() (sysInfo SystemInfo, err error) {
return sysInfo, err
}
return SystemInfo{
return entity.SystemInfo{
ARCH: runtime.GOARCH,
OS: runtime.GOOS,
NumCpu: runtime.GOMAXPROCS(0),
@@ -78,7 +65,7 @@ func GetPathValues() (paths []string) {
return strings.Split(pathEnv, ":")
}
func GetExecutables() (executables []Executable, err error) {
func GetExecutables() (executables []entity.Executable, err error) {
pathValues := GetPathValues()
cache := map[string]string{}
@@ -97,7 +84,7 @@ func GetExecutables() (executables []Executable, err error) {
if cache[filePath] == "" {
if displayName != "" {
executables = append(executables, Executable{
executables = append(executables, entity.Executable{
Path: filePath,
FileName: file.Name(),
DisplayName: displayName,

View File

@@ -4,7 +4,6 @@ import (
"crawlab/constants"
"crawlab/database"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"runtime/debug"
"time"
@@ -25,6 +24,7 @@ type Task struct {
WaitDuration float64 `json:"wait_duration" bson:"wait_duration"`
RuntimeDuration float64 `json:"runtime_duration" bson:"runtime_duration"`
TotalDuration float64 `json:"total_duration" bson:"total_duration"`
Pid int `json:"pid" bson:"pid"`
// 前端数据
SpiderName string `json:"spider_name"`
@@ -117,20 +117,16 @@ func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Tas
for i, task := range tasks {
// 获取爬虫名称
spider, err := task.GetSpider()
if err == mgo.ErrNotFound {
// do nothing
} else if err != nil {
return tasks, err
if err != nil || spider.Id.Hex() == "" {
_ = spider.Delete()
} else {
tasks[i].SpiderName = spider.DisplayName
}
// 获取节点名称
node, err := task.GetNode()
if err == mgo.ErrNotFound {
// do nothing
} else if err != nil {
return tasks, err
if node.Id.Hex() == "" || err != nil {
_ = task.Delete()
} else {
tasks[i].NodeName = node.Name
}
@@ -191,6 +187,7 @@ func RemoveTask(id string) error {
return nil
}
// 删除task by spider_id
func RemoveTaskBySpiderId(id bson.ObjectId) error {
tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
if err != nil {
@@ -206,6 +203,7 @@ func RemoveTaskBySpiderId(id bson.ObjectId) error {
return nil
}
// task 总数
func GetTaskCount(query interface{}) (int, error) {
s, c := database.GetCol("tasks")
defer s.Close()
@@ -308,6 +306,7 @@ func GetDailyTaskStats(query bson.M) ([]TaskDailyItem, error) {
return dailyItems, nil
}
// 更新task的结果数
func UpdateTaskResultCount(id string) (err error) {
// 获取任务
task, err := GetTask(id)
@@ -343,3 +342,25 @@ func UpdateTaskResultCount(id string) (err error) {
}
return nil
}
func UpdateTaskToAbnormal(nodeId bson.ObjectId) error {
s, c := database.GetCol("tasks")
defer s.Close()
selector := bson.M{
"node_id": nodeId,
"status": constants.StatusRunning,
}
update := bson.M{
"$set": bson.M{
"status": constants.StatusAbnormal,
},
}
_, err := c.UpdateAll(selector, update)
if err != nil {
log.Errorf("update task to abnormal error: %s, node_id : %s", err.Error(), nodeId.Hex())
debug.PrintStack()
return err
}
return nil
}

View File

@@ -15,9 +15,9 @@ func GetNodeList(c *gin.Context) {
return
}
for i, node := range nodes {
nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex())
}
//for i, node := range nodes {
// nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex())
//}
c.JSON(http.StatusOK, Response{
Status: "ok",
@@ -109,11 +109,11 @@ func GetSystemInfo(c *gin.Context) {
})
}
func DeleteNode(c *gin.Context) {
func DeleteNode(c *gin.Context) {
id := c.Param("id")
node, err := model.GetNode(bson.ObjectIdHex(id))
if err != nil {
HandleError(http.StatusInternalServerError, c ,err)
HandleError(http.StatusInternalServerError, c, err)
return
}
err = node.Delete()

View File

@@ -1,7 +1,6 @@
package routes
import (
"crawlab/constants"
"crawlab/model"
"crawlab/services"
"github.com/gin-gonic/gin"
@@ -46,13 +45,14 @@ func PostSchedule(c *gin.Context) {
HandleError(http.StatusBadRequest, c, err)
return
}
newItem.Id = bson.ObjectIdHex(id)
// 如果node_id为空则置为空ObjectId
if newItem.NodeId == "" {
newItem.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
// 验证cron表达式
if err := services.ParserCron(newItem.Cron); err != nil {
HandleError(http.StatusOK, c, err)
return
}
newItem.Id = bson.ObjectIdHex(id)
// 更新数据库
if err := model.UpdateSchedule(bson.ObjectIdHex(id), newItem); err != nil {
HandleError(http.StatusInternalServerError, c, err)
@@ -80,9 +80,10 @@ func PutSchedule(c *gin.Context) {
return
}
// 如果node_id为空则置为空ObjectId
if item.NodeId == "" {
item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
// 验证cron表达式
if err := services.ParserCron(item.Cron); err != nil {
HandleError(http.StatusOK, c, err)
return
}
// 更新数据库

View File

@@ -3,6 +3,7 @@ package routes
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/services"
"crawlab/utils"
@@ -11,7 +12,7 @@ import (
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
"io/ioutil"
"net/http"
@@ -24,7 +25,22 @@ import (
)
func GetSpiderList(c *gin.Context) {
results, err := model.GetSpiderList(nil, 0, 0)
pageNum, _ := c.GetQuery("pageNum")
pageSize, _ := c.GetQuery("pageSize")
keyword, _ := c.GetQuery("keyword")
t, _ := c.GetQuery("type")
filter := bson.M{
"name": bson.M{"$regex": bson.RegEx{Pattern: keyword, Options: "im"}},
}
if t != "" {
filter["type"] = t
}
page := &entity.Page{}
page.GetPage(pageNum, pageSize)
results, count, err := model.GetSpiderList(filter, page.Skip, page.Limit)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
@@ -32,7 +48,7 @@ func GetSpiderList(c *gin.Context) {
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: results,
Data: bson.M{"list": results, "total": count},
})
}
@@ -79,18 +95,6 @@ func PostSpider(c *gin.Context) {
})
}
func PublishAllSpiders(c *gin.Context) {
if err := services.PublishAllSpiders(); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
}
func PublishSpider(c *gin.Context) {
id := c.Param("id")
@@ -104,10 +108,7 @@ func PublishSpider(c *gin.Context) {
return
}
if err := services.PublishSpider(spider); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
services.PublishSpider(spider)
c.JSON(http.StatusOK, Response{
Status: "ok",
@@ -117,7 +118,7 @@ func PublishSpider(c *gin.Context) {
func PutSpider(c *gin.Context) {
// 从body中获取文件
file, err := c.FormFile("file")
uploadFile, err := c.FormFile("file")
if err != nil {
debug.PrintStack()
HandleError(http.StatusInternalServerError, c, err)
@@ -125,7 +126,7 @@ func PutSpider(c *gin.Context) {
}
// 如果不为zip文件返回错误
if !strings.HasSuffix(file.Filename, ".zip") {
if !strings.HasSuffix(uploadFile.Filename, ".zip") {
debug.PrintStack()
HandleError(http.StatusBadRequest, c, errors.New("Not a valid zip file"))
return
@@ -134,7 +135,7 @@ func PutSpider(c *gin.Context) {
// 以防tmp目录不存在
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.Mkdir(tmpPath, os.ModePerm); err != nil {
if err := os.MkdirAll(tmpPath, os.ModePerm); err != nil {
log.Error("mkdir other.tmppath dir error:" + err.Error())
debug.PrintStack()
HandleError(http.StatusBadRequest, c, errors.New("Mkdir other.tmppath dir error"))
@@ -145,57 +146,54 @@ func PutSpider(c *gin.Context) {
// 保存到本地临时文件
randomId := uuid.NewV4()
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
if err := c.SaveUploadedFile(file, tmpFilePath); err != nil {
if err := c.SaveUploadedFile(uploadFile, tmpFilePath); err != nil {
log.Error("save upload file error: " + err.Error())
debug.PrintStack()
HandleError(http.StatusInternalServerError, c, err)
return
}
// 读取临时文件
tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777)
s, gf := database.GetGridFs("files")
defer s.Close()
// 判断文件是否已经存在
var gfFile model.GridFs
if err := gf.Find(bson.M{"filename": uploadFile.Filename}).One(&gfFile); err == nil {
// 已经存在文件,则删除
_ = gf.RemoveId(gfFile.Id)
}
// 上传到GridFs
fid, err := services.UploadToGridFs(uploadFile.Filename, tmpFilePath)
if err != nil {
log.Errorf("upload to grid fs error: %s", err.Error())
debug.PrintStack()
HandleError(http.StatusInternalServerError, c, err)
return
}
if err = tmpFile.Close(); err != nil {
debug.PrintStack()
HandleError(http.StatusInternalServerError, c, err)
return
}
// 目标目录
dstPath := filepath.Join(
viper.GetString("spider.path"),
strings.Replace(file.Filename, ".zip", "", 1),
)
idx := strings.LastIndex(uploadFile.Filename, "/")
targetFilename := uploadFile.Filename[idx+1:]
// 如果目标目录已存在,删除目标目录
if utils.Exists(dstPath) {
if err := os.RemoveAll(dstPath); err != nil {
debug.PrintStack()
HandleError(http.StatusInternalServerError, c, err)
// 判断爬虫是否存在
spiderName := strings.Replace(targetFilename, ".zip", "", 1)
spider := model.GetSpiderByName(spiderName)
if spider == nil {
// 保存爬虫信息
srcPath := viper.GetString("spider.path")
spider := model.Spider{
Name: spiderName,
DisplayName: spiderName,
Type: constants.Customized,
Src: filepath.Join(srcPath, spiderName),
FileId: fid,
}
_ = spider.Add()
} else {
// 更新file_id
spider.FileId = fid
_ = spider.Save()
}
// 将临时文件解压到爬虫目录
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
debug.PrintStack()
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除临时文件
if err = os.Remove(tmpFilePath); err != nil {
debug.PrintStack()
HandleError(http.StatusInternalServerError, c, err)
return
}
// 更新爬虫
services.UpdateSpiders()
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
@@ -210,33 +208,7 @@ func DeleteSpider(c *gin.Context) {
return
}
// 获取该爬虫
spider, err := model.GetSpider(bson.ObjectIdHex(id))
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除爬虫文件目录
if err := os.RemoveAll(spider.Src); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 从数据库中删除该爬虫
if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除日志文件
if err := services.RemoveLogBySpiderId(spider.Id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除爬虫对应的task任务
if err := model.RemoveTaskBySpiderId(spider.Id); err != nil {
if err := services.RemoveSpider(id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
@@ -284,7 +256,8 @@ func GetSpiderDir(c *gin.Context) {
}
// 获取目录下文件列表
f, err := ioutil.ReadDir(filepath.Join(spider.Src, path))
spiderPath := viper.GetString("spider.path")
f, err := ioutil.ReadDir(filepath.Join(spiderPath, spider.Name, path))
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
@@ -373,6 +346,20 @@ func PostSpiderFile(c *gin.Context) {
})
}
// 爬虫类型
func GetSpiderTypes(c *gin.Context) {
types, err := model.GetSpiderTypes()
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: types,
})
}
func GetSpiderStats(c *gin.Context) {
type Overview struct {
TaskCount int `json:"task_count" bson:"task_count"`

View File

@@ -66,7 +66,7 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
"message": "error",
"error": errStr,
})
break
case validator.ValidationErrors:
validatorErrors := causeError.(validator.ValidationErrors)
//firstError := validatorErrors[0].(validator.FieldError)
@@ -75,7 +75,6 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
"message": "error",
"error": validatorErrors.Error(),
})
break
default:
fmt.Println("deprecated....")
c.AbortWithStatusJSON(httpCode, gin.H{

View File

@@ -3,9 +3,9 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
@@ -15,6 +15,7 @@ import (
"os"
"path/filepath"
"runtime/debug"
"time"
)
// 任务日志频道映射
@@ -23,7 +24,7 @@ var TaskLogChanMap = utils.NewChanMap()
// 获取远端日志
func GetRemoteLog(task model.Task) (logStr string, err error) {
// 序列化消息
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeGetLog,
LogPath: task.LogPath,
TaskId: task.Id,
@@ -45,8 +46,14 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
// 生成频道等待获取log
ch := TaskLogChanMap.ChanBlocked(task.Id)
// 此处阻塞,等待结果
logStr = <-ch
select {
case logStr = <-ch:
log.Infof("get remote log")
break
case <-time.After(30 * time.Second):
logStr = "get remote log timeout"
break
}
return logStr, nil
}
@@ -67,7 +74,7 @@ func DeleteLogPeriodically() {
for _, fi := range rd {
if fi.IsDir() {
log.Info(filepath.Join(logDir, fi.Name()))
os.RemoveAll(filepath.Join(logDir, fi.Name()))
_ = os.RemoveAll(filepath.Join(logDir, fi.Name()))
log.Info("Delete Log File Success")
}
}
@@ -85,21 +92,16 @@ func RemoveLocalLog(path string) error {
// 删除远程日志
func RemoveRemoteLog(task model.Task) error {
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeRemoveLog,
LogPath: task.LogPath,
TaskId: task.Id,
}
msgBytes, err := json.Marshal(&msg)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 发布获取日志消息
channel := "nodes:" + task.NodeId.Hex()
if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil {
log.Errorf(err.Error())
if _, err := database.RedisClient.Publish(channel, utils.GetJson(msg)); err != nil {
log.Errorf("publish redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
@@ -119,10 +121,12 @@ func RemoveLogByTaskId(id string) error {
func removeLog(t model.Task) {
if err := RemoveLocalLog(t.LogPath); err != nil {
log.Error("remove local log error:" + err.Error())
log.Errorf("remove local log error: %s", err.Error())
debug.PrintStack()
}
if err := RemoveRemoteLog(t); err != nil {
log.Error("remove remote log error:" + err.Error())
log.Errorf("remove remote log error: %s", err.Error())
debug.PrintStack()
}
}
@@ -130,7 +134,8 @@ func removeLog(t model.Task) {
func RemoveLogBySpiderId(id bson.ObjectId) error {
tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Error("get tasks error:" + err.Error())
log.Errorf("get tasks error: %s", err.Error())
debug.PrintStack()
}
for _, task := range tasks {
removeLog(task)

View File

@@ -13,10 +13,8 @@ import (
func TestDeleteLogPeriodically(t *testing.T) {
Convey("Test DeleteLogPeriodically", t, func() {
if err := config.InitConfig("../conf/config.yml"); err != nil {
log.Error("init config error:" + err.Error())
panic(err)
}
err := config.InitConfig("../conf/config.yml")
So(err, ShouldBeNil)
log.Info("初始化配置成功")
logDir := viper.GetString("log.path")
log.Info(logDir)
@@ -28,24 +26,16 @@ func TestGetLocalLog(t *testing.T) {
//create a log file for test
logPath := "../logs/crawlab/test.log"
f, err := os.Create(logPath)
defer f.Close()
defer utils.Close(f)
if err != nil {
fmt.Println(err.Error())
fmt.Println(err)
} else {
_, err = f.WriteString("This is for test")
fmt.Println(err)
}
Convey("Test GetLocalLog", t, func() {
Convey("Test response", func() {
logStr, err := GetLocalLog(logPath)
log.Info(utils.BytesToString(logStr))
fmt.Println(err)
So(err, ShouldEqual, nil)
})
})
//delete the test log file
os.Remove(logPath)
_ = os.Remove(logPath)
}

View File

@@ -2,47 +2,36 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/model"
"crawlab/entity"
"github.com/apex/log"
)
type Handler interface {
Handle() error
}
func GetMsgHandler(msg NodeMessage) Handler {
func GetMsgHandler(msg entity.NodeMessage) Handler {
log.Infof("received msg , type is : %s", msg.Type)
if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
// 日志相关
return &Log{
msg: msg,
}
} else if msg.Type == constants.MsgTypeCancelTask {
// 任务相关
return &Task{
msg: msg,
}
} else if msg.Type == constants.MsgTypeGetSystemInfo {
// 系统信息相关
return &SystemInfo{
msg: msg,
}
} else if msg.Type == constants.MsgTypeRemoveSpider {
// 爬虫相关
return &Spider{
SpiderId: msg.SpiderId,
}
}
return nil
}
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 节点相关
NodeId string `json:"node_id"` // 节点ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 系统信息
SysInfo model.SystemInfo `json:"sys_info"`
// 错误相关
Error string `json:"error"`
}

View File

@@ -3,15 +3,15 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"runtime/debug"
)
type Log struct {
msg NodeMessage
msg entity.NodeMessage
}
func (g *Log) Handle() error {
@@ -25,33 +25,27 @@ func (g *Log) Handle() error {
func (g *Log) get() error {
// 发出的消息
msgSd := NodeMessage{
msgSd := entity.NodeMessage{
Type: constants.MsgTypeGetLog,
TaskId: g.msg.TaskId,
}
// 获取本地日志
logStr, err := model.GetLocalLog(g.msg.LogPath)
log.Info(utils.BytesToString(logStr))
if err != nil {
log.Errorf(err.Error())
log.Errorf("get node local log error: %s", err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
msgSd.Log = err.Error()
} else {
msgSd.Log = utils.BytesToString(logStr)
}
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
return err
}
// 发布消息给主节点
log.Info("publish get log msg to master")
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
if err := database.Pub(constants.ChannelMasterNode, msgSd); err != nil {
log.Errorf("pub log to master node error: %s", err.Error())
debug.PrintStack()
return err
}
log.Infof(msgSd.Log)
return nil
}

View File

@@ -0,0 +1,24 @@
package msg_handler
import (
"crawlab/model"
"crawlab/utils"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"path/filepath"
)
type Spider struct {
SpiderId string
}
func (s *Spider) Handle() error {
// 移除本地的爬虫目录
spider, err := model.GetSpider(bson.ObjectIdHex(s.SpiderId))
if err != nil {
return err
}
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
utils.RemoveFiles(path)
return nil
}

View File

@@ -3,15 +3,12 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"runtime/debug"
)
type SystemInfo struct {
msg NodeMessage
msg entity.NodeMessage
}
func (s *SystemInfo) Handle() error {
@@ -20,19 +17,12 @@ func (s *SystemInfo) Handle() error {
if err != nil {
return err
}
msgSd := NodeMessage{
msgSd := entity.NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: s.msg.NodeId,
SysInfo: sysInfo,
}
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
log.Errorf(err.Error())
if err := database.Pub(constants.ChannelMasterNode, msgSd); err != nil {
return err
}
return nil

View File

@@ -2,16 +2,39 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"github.com/apex/log"
"runtime/debug"
"time"
)
type Task struct {
msg NodeMessage
msg entity.NodeMessage
}
func (t *Task) Handle() error {
log.Infof("received cancel task msg, task_id: %s", t.msg.TaskId)
// 取消任务
ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId)
ch <- constants.TaskCancel
if ch != nil {
ch <- constants.TaskCancel
} else {
log.Infof("chan is empty, update status to abnormal")
// 节点可能被重启找不到chan
task, err := model.GetTask(t.msg.TaskId)
if err != nil {
log.Errorf("task not found, task_id: %s", t.msg.TaskId)
debug.PrintStack()
return err
}
task.Status = constants.StatusAbnormal
task.FinishTs = time.Now()
if err := task.Save(); err != nil {
debug.PrintStack()
log.Infof("cancel task error: %s", err.Error())
}
}
return nil
}

View File

@@ -1,9 +1,9 @@
package services
import (
"context"
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
@@ -14,7 +14,6 @@ import (
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
"runtime/debug"
"time"
)
@@ -28,77 +27,10 @@ type Data struct {
UpdateTsUnix int64 `json:"update_ts_unix"`
}
const (
Yes = "Y"
No = "N"
)
// 获取本机节点
func GetCurrentNode() (model.Node, error) {
// 获得注册的key值
key, err := register.GetRegister().GetKey()
if err != nil {
return model.Node{}, err
}
// 从数据库中获取当前节点
var node model.Node
errNum := 0
for {
// 如果错误次数超过10次
if errNum >= 10 {
panic("cannot get current node")
}
// 尝试获取节点
node, err = model.GetNodeByKey(key)
// 如果获取失败
if err != nil {
// 如果为主节点,表示为第一次注册,插入节点信息
if IsMaster() {
// 获取本机信息
ip, mac, key, err := model.GetNodeBaseInfo()
if err != nil {
debug.PrintStack()
return node, err
}
// 生成节点
node = model.Node{
Key: key,
Id: bson.NewObjectId(),
Ip: ip,
Name: ip,
Mac: mac,
IsMaster: true,
}
if err := node.Add(); err != nil {
return node, err
}
return node, nil
}
// 增加错误次数
errNum++
// 5秒后重试
time.Sleep(5 * time.Second)
continue
}
// 跳出循环
break
}
return node, nil
}
// 当前节点是否为主节点
func IsMaster() bool {
return viper.GetString("server.master") == Yes
}
// 所有调用IsMasterNode的方法都永远会在master节点执行所以GetCurrentNode方法返回永远是master节点
// 该ID的节点是否为主节点
func IsMasterNode(id string) bool {
curNode, _ := GetCurrentNode()
curNode, _ := model.GetCurrentNode()
node, _ := model.GetNode(bson.ObjectIdHex(id))
return curNode.Id == node.Id
}
@@ -156,6 +88,8 @@ func UpdateNodeStatus() {
handleNodeInfo(key, data)
}
// 重新获取list
list, _ = database.RedisClient.HKeys("nodes")
// 重置不在redis的key为offline
model.ResetNodeStatusToOffline(list)
}
@@ -168,7 +102,7 @@ func handleNodeInfo(key string, data Data) {
// 同个key可能因为并发被注册多次
var nodes []model.Node
_ = c.Find(bson.M{"key": key}).All(&nodes)
if nodes != nil && len(nodes) > 1 {
if len(nodes) > 1 {
for _, node := range nodes {
_ = c.RemoveId(node.Id)
}
@@ -178,13 +112,15 @@ func handleNodeInfo(key string, data Data) {
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
// 数据库不存在该节点
node = model.Node{
Key: key,
Name: data.Ip,
Ip: data.Ip,
Port: "8000",
Mac: data.Mac,
Status: constants.StatusOnline,
IsMaster: data.Master,
Key: key,
Name: data.Ip,
Ip: data.Ip,
Port: "8000",
Mac: data.Mac,
Status: constants.StatusOnline,
IsMaster: data.Master,
UpdateTs: time.Now(),
UpdateTsUnix: time.Now().Unix(),
}
if err := node.Add(); err != nil {
log.Errorf(err.Error())
@@ -193,6 +129,8 @@ func handleNodeInfo(key string, data Data) {
} else {
// 数据库存在该节点
node.Status = constants.StatusOnline
node.UpdateTs = time.Now()
node.UpdateTsUnix = time.Now().Unix()
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
@@ -217,13 +155,17 @@ func UpdateNodeData() {
}
// 获取redis的key
key, err := register.GetRegister().GetKey()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 构造节点数据
data := Data{
Key: key,
Mac: mac,
Ip: ip,
Master: IsMaster(),
Master: model.IsMaster(),
UpdateTs: time.Now(),
UpdateTsUnix: time.Now().Unix(),
}
@@ -243,7 +185,7 @@ func UpdateNodeData() {
func MasterNodeCallback(message redis.Message) (err error) {
// 反序列化
var msg msg_handler.NodeMessage
var msg entity.NodeMessage
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
@@ -251,7 +193,6 @@ func MasterNodeCallback(message redis.Message) (err error) {
if msg.Type == constants.MsgTypeGetLog {
// 获取日志
fmt.Println(msg)
time.Sleep(10 * time.Millisecond)
ch := TaskLogChanMap.ChanBlocked(msg.TaskId)
ch <- msg.Log
@@ -268,14 +209,10 @@ func MasterNodeCallback(message redis.Message) (err error) {
func WorkerNodeCallback(message redis.Message) (err error) {
// 反序列化
msg := msg_handler.NodeMessage{}
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
}
// worker message handle
if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil {
msg := utils.GetMessage(message)
if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil {
log.Errorf("msg handler error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
@@ -297,30 +234,32 @@ func InitNodeService() error {
UpdateNodeData()
// 获取当前节点
node, err := GetCurrentNode()
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
return err
}
ctx := context.Background()
if IsMaster() {
if model.IsMaster() {
// 如果为主节点,订阅主节点通信频道
channel := "nodes:master"
err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel)
if err != nil {
if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
return err
}
} else {
// 若为工作节点,订阅单独指定通信频道
channel := "nodes:" + node.Id.Hex()
err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel)
if err != nil {
channel := constants.ChannelWorkerNode + node.Id.Hex()
if err := database.Sub(channel, WorkerNodeCallback); err != nil {
return err
}
}
// 订阅全通道
if err := database.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil {
return err
}
// 如果为主节点每30秒刷新所有节点信息
if IsMaster() {
if model.IsMaster() {
spec := "*/10 * * * * *"
if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil {
debug.PrintStack()
@@ -328,6 +267,12 @@ func InitNodeService() error {
}
}
// 更新在当前节点执行中的任务状态为abnormal
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
debug.PrintStack()
return err
}
c.Start()
return nil
}

View File

@@ -5,7 +5,7 @@ import (
"crawlab/lib/cron"
"crawlab/model"
"github.com/apex/log"
uuid "github.com/satori/go.uuid"
"github.com/satori/go.uuid"
"runtime/debug"
)
@@ -17,7 +17,22 @@ type Scheduler struct {
func AddTask(s model.Schedule) func() {
return func() {
nodeId := s.NodeId
node, err := model.GetNodeByKey(s.NodeKey)
if err != nil || node.Id.Hex() == "" {
log.Errorf("get node by key error: %s", err.Error())
debug.PrintStack()
return
}
spider := model.GetSpiderByName(s.SpiderName)
if spider == nil || spider.Id.Hex() == "" {
log.Errorf("get spider by name error: %s", err.Error())
debug.PrintStack()
return
}
// 同步ID到定时任务
s.SyncNodeIdAndSpiderId(node, *spider)
// 生成任务ID
id := uuid.NewV4()
@@ -25,8 +40,8 @@ func AddTask(s model.Schedule) func() {
// 生成任务模型
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
NodeId: nodeId,
SpiderId: spider.Id,
NodeId: node.Id,
Status: constants.StatusPending,
Param: s.Param,
}
@@ -62,12 +77,16 @@ func (s *Scheduler) Start() error {
// 更新任务列表
if err := s.Update(); err != nil {
log.Errorf("update scheduler error: %s", err.Error())
debug.PrintStack()
return err
}
// 每30秒更新一次任务列表
spec := "*/30 * * * * *"
if _, err := exec.AddFunc(spec, UpdateSchedules); err != nil {
log.Errorf("add func update schedulers error: %s", err.Error())
debug.PrintStack()
return err
}
@@ -80,12 +99,16 @@ func (s *Scheduler) AddJob(job model.Schedule) error {
// 添加任务
eid, err := s.cron.AddFunc(spec, AddTask(job))
if err != nil {
log.Errorf("add func task error: %s", err.Error())
debug.PrintStack()
return err
}
// 更新EntryID
job.EntryId = eid
if err := job.Save(); err != nil {
log.Errorf("job save error: %s", err.Error())
debug.PrintStack()
return err
}
@@ -99,6 +122,18 @@ func (s *Scheduler) RemoveAll() {
}
}
// 验证cron表达式是否正确
func ParserCron(spec string) error {
parser := cron.NewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
if _, err := parser.Parse(spec); err != nil {
return err
}
return nil
}
func (s *Scheduler) Update() error {
// 删除所有定时任务
s.RemoveAll()
@@ -106,6 +141,8 @@ func (s *Scheduler) Update() error {
// 获取所有定时任务
sList, err := model.GetScheduleList(nil)
if err != nil {
log.Errorf("get scheduler list error: %s", err.Error())
debug.PrintStack()
return err
}
@@ -116,6 +153,8 @@ func (s *Scheduler) Update() error {
// 添加到定时任务
if err := s.AddJob(job); err != nil {
log.Errorf("add job error: %s, job: %s, cron: %s", err.Error(), job.Name, job.Cron)
debug.PrintStack()
return err
}
}
@@ -128,6 +167,8 @@ func InitScheduler() error {
cron: cron.New(cron.WithSeconds()),
}
if err := Sched.Start(); err != nil {
log.Errorf("start scheduler error: %s", err.Error())
debug.PrintStack()
return err
}
return nil

View File

@@ -1,28 +1,22 @@
package services
import (
"context"
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/spider_handler"
"crawlab/utils"
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime/debug"
"strings"
"syscall"
)
type SpiderFileData struct {
@@ -36,175 +30,14 @@ type SpiderUploadMessage struct {
SpiderId string
}
// 从项目目录中获取爬虫列表
func GetSpidersFromDir() ([]model.Spider, error) {
// 爬虫项目目录路径
srcPath := viper.GetString("spider.path")
// 如果爬虫项目目录不存在,则创建一个
if !utils.Exists(srcPath) {
mask := syscall.Umask(0) // 改为 0000 八进制
defer syscall.Umask(mask) // 改为原来的 umask
if err := os.MkdirAll(srcPath, 0766); err != nil {
debug.PrintStack()
return []model.Spider{}, err
}
}
// 获取爬虫项目目录下的所有子项
items, err := ioutil.ReadDir(srcPath)
if err != nil {
debug.PrintStack()
return []model.Spider{}, err
}
// 定义爬虫列表
spiders := make([]model.Spider, 0)
// 遍历所有子项
for _, item := range items {
// 忽略不为目录的子项
if !item.IsDir() {
continue
}
// 忽略隐藏目录
if strings.HasPrefix(item.Name(), ".") {
continue
}
// 构造爬虫
spider := model.Spider{
Name: item.Name(),
DisplayName: item.Name(),
Type: constants.Customized,
Src: filepath.Join(srcPath, item.Name()),
FileId: bson.ObjectIdHex(constants.ObjectIdNull),
}
// 将爬虫加入列表
spiders = append(spiders, spider)
}
return spiders, nil
}
// 将爬虫保存到数据库
func SaveSpiders(spiders []model.Spider) error {
s, c := database.GetCol("spiders")
defer s.Close()
if len(spiders) == 0 {
err := model.RemoveAllSpider()
if err != nil {
log.Error("remove all spider error:" + err.Error())
return err
}
log.Info("get spider from dir is empty,removed all spider")
return nil
}
// 如果该爬虫不存在于数据库,则保存爬虫到数据库
for _, spider := range spiders {
// 忽略非自定义爬虫
if spider.Type != constants.Customized {
continue
}
spider_ := []*model.Spider{}
_ = c.Find(bson.M{"src": spider.Src}).All(&spider_)
// 以防出现多个重复的爬虫
if len(spider_) > 1 {
if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
continue
}
if err := spider.Add(); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
continue
}
continue
}
if len(spider_) == 0 {
// 不存在
if err := spider.Add(); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
continue
}
}
}
return nil
}
// 更新爬虫
func UpdateSpiders() {
// 从项目目录获取爬虫列表
spiders, err := GetSpidersFromDir()
if err != nil {
log.Errorf(err.Error())
return
}
// 储存爬虫
if err := SaveSpiders(spiders); err != nil {
log.Errorf(err.Error())
return
}
}
// 打包爬虫目录为zip文件
func ZipSpider(spider model.Spider) (filePath string, err error) {
// 如果源文件夹不存在,抛错
if !utils.Exists(spider.Src) {
debug.PrintStack()
// 删除该爬虫,否则会一直报错
_ = model.RemoveSpider(spider.Id)
return "", errors.New("source path does not exist")
}
// 临时文件路径
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return "", err
}
}
filePath = filepath.Join(tmpPath, randomId.String()+".zip")
// 将源文件夹打包为zip文件
d, err := os.Open(spider.Src)
if err != nil {
debug.PrintStack()
return filePath, err
}
var files []*os.File
files = append(files, d)
if err := utils.Compress(files, filePath); err != nil {
return filePath, err
}
return filePath, nil
}
// 上传zip文件到GridFS
func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid bson.ObjectId, err error) {
func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) {
fid = ""
// 获取MongoDB GridFS连接
s, gf := database.GetGridFs("files")
defer s.Close()
// 如果存在FileId删除GridFS上的老文件
if !utils.IsObjectIdNull(spider.FileId) {
if err = gf.RemoveId(spider.FileId); err != nil {
log.Error("remove gf file:" + err.Error())
debug.PrintStack()
}
}
// 创建一个新GridFS文件
f, err := gf.Create(fileName)
if err != nil {
@@ -234,6 +67,7 @@ func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid
return fid, nil
}
// 写入grid fs
func WriteToGridFS(content []byte, f *mgo.GridFile) {
if _, err := f.Write(content); err != nil {
debug.PrintStack()
@@ -248,7 +82,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre
log.Infof("can't opened this file")
return err
}
defer f.Close()
defer utils.Close(f)
s := make([]byte, 4096)
for {
switch nr, err := f.Read(s[:]); true {
@@ -264,149 +98,101 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre
}
// 发布所有爬虫
func PublishAllSpiders() error {
func PublishAllSpiders() {
// 获取爬虫列表
spiders, err := model.GetSpiderList(nil, 0, constants.Infinite)
if err != nil {
log.Errorf(err.Error())
return err
spiders, _, _ := model.GetSpiderList(nil, 0, constants.Infinite)
if len(spiders) == 0 {
return
}
log.Infof("start sync spider to local, total: %d", len(spiders))
// 遍历爬虫列表
for _, spider := range spiders {
// 发布爬虫
if err := PublishSpider(spider); err != nil {
log.Errorf("publish spider error:" + err.Error())
// return err
}
}
return nil
}
func PublishAllSpidersJob() {
if err := PublishAllSpiders(); err != nil {
log.Errorf(err.Error())
// 异步发布爬虫
go func(s model.Spider) {
PublishSpider(s)
}(spider)
}
}
// 发布爬虫
// 1. 将源文件夹打包为zip文件
// 2. 上传zip文件到GridFS
// 3. 发布消息给工作节点
func PublishSpider(spider model.Spider) (err error) {
// 将源文件夹打包为zip文件
filePath, err := ZipSpider(spider)
if err != nil {
return err
}
// 上传zip文件到GridFS
fileName := filepath.Base(spider.Src) + ".zip"
fid, err := UploadToGridFs(spider, fileName, filePath)
if err != nil {
return err
}
// 保存FileId
spider.FileId = fid
if err := spider.Save(); err != nil {
return err
}
// 发布消息给工作节点
msg := SpiderUploadMessage{
FileId: fid.Hex(),
FileName: fileName,
SpiderId: spider.Id.Hex(),
}
msgStr, err := json.Marshal(msg)
if err != nil {
func PublishSpider(spider model.Spider) {
// 查询gf file不存在则删除
gfFile := model.GetGridFs(spider.FileId)
if gfFile == nil {
_ = model.RemoveSpider(spider.Id)
return
}
channel := "files:upload"
if _, err = database.RedisClient.Publish(channel, utils.BytesToString(msgStr)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
spiderSync := spider_handler.SpiderSync{
Spider: spider,
}
return
//目录不存在,则直接下载
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
if !utils.Exists(path) {
log.Infof("path not found: %s", path)
spiderSync.Download()
spiderSync.CreateMd5File(gfFile.Md5)
return
}
// md5文件不存在则下载
md5 := filepath.Join(path, spider_handler.Md5File)
if !utils.Exists(md5) {
log.Infof("md5 file not found: %s", md5)
spiderSync.RemoveSpiderFile()
spiderSync.Download()
spiderSync.CreateMd5File(gfFile.Md5)
return
}
// md5值不一样则下载
md5Str := utils.ReadFileOneLine(md5)
// 去掉空格以及换行符
md5Str = strings.Replace(md5Str, " ", "", -1)
md5Str = strings.Replace(md5Str, "\n", "", -1)
if gfFile.Md5 != md5Str {
log.Infof("md5 is different, gf-md5:%s, file-md5:%s", gfFile.Md5, md5Str)
spiderSync.RemoveSpiderFile()
spiderSync.Download()
spiderSync.CreateMd5File(gfFile.Md5)
return
}
}
// 上传爬虫回调
func OnFileUpload(message redis.Message) (err error) {
s, gf := database.GetGridFs("files")
defer s.Close()
// 反序列化消息
var msg SpiderUploadMessage
if err := json.Unmarshal(message.Data, &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 从GridFS获取该文件
f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId))
func RemoveSpider(id string) error {
// 获取该爬虫
spider, err := model.GetSpider(bson.ObjectIdHex(id))
if err != nil {
log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error())
debug.PrintStack()
return err
}
defer f.Close()
// 生成唯一ID
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return err
}
}
// 创建临时文件
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
defer tmpFile.Close()
// 将该文件写入临时文件
if _, err := io.Copy(tmpFile, f); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 解压缩临时文件到目标文件夹
dstPath := filepath.Join(
viper.GetString("spider.path"),
// strings.Replace(msg.FileName, ".zip", "", -1),
)
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
// 删除爬虫文件目录
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
utils.RemoveFiles(path)
// 删除其他节点的爬虫目录
msg := entity.NodeMessage{
Type: constants.MsgTypeRemoveSpider,
SpiderId: id,
}
if err := database.Pub(constants.ChannelAllNode, msg); err != nil {
return err
}
// 关闭临时文件
if err := tmpFile.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
// 从数据库中删除该爬虫
if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil {
return err
}
// 删除临时文件
if err := os.Remove(tmpFilePath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
// 删除日志文件
if err := RemoveLogBySpiderId(spider.Id); err != nil {
return err
}
// 删除爬虫对应的task任务
if err := model.RemoveTaskBySpiderId(spider.Id); err != nil {
return err
}
// TODO 定时任务如何处理
return nil
}
@@ -414,31 +200,9 @@ func OnFileUpload(message redis.Message) (err error) {
func InitSpiderService() error {
// 构造定时任务执行器
c := cron.New(cron.WithSeconds())
if IsMaster() {
// 主节点
// 每5秒更新一次爬虫信息
if _, err := c.AddFunc("*/5 * * * * *", UpdateSpiders); err != nil {
return err
}
// 每60秒同步爬虫给工作节点
if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil {
return err
}
} else {
// 非主节点
// 订阅文件上传
channel := "files:upload"
//sub.Connect()
ctx := context.Background()
return database.RedisClient.Subscribe(ctx, OnFileUpload, channel)
if _, err := c.AddFunc("0 * * * * *", PublishAllSpiders); err != nil {
return err
}
// 启动定时任务
c.Start()

View File

@@ -0,0 +1,137 @@
package spider_handler
import (
"crawlab/database"
"crawlab/model"
"crawlab/utils"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
"io"
"os"
"path/filepath"
"runtime/debug"
)
const (
Md5File = "md5.txt"
)
type SpiderSync struct {
Spider model.Spider
}
func (s *SpiderSync) CreateMd5File(md5 string) {
path := filepath.Join(viper.GetString("spider.path"), s.Spider.Name)
utils.CreateFilePath(path)
fileName := filepath.Join(path, Md5File)
file := utils.OpenFile(fileName)
defer utils.Close(file)
if file != nil {
if _, err := file.WriteString(md5 + "\n"); err != nil {
log.Errorf("file write string error: %s", err.Error())
debug.PrintStack()
}
}
}
// 获得下载锁的key
func (s *SpiderSync) GetLockDownloadKey(spiderId string) string {
node, _ := model.GetCurrentNode()
return node.Id.Hex() + "#" + spiderId
}
// 删除本地文件
func (s *SpiderSync) RemoveSpiderFile() {
path := filepath.Join(
viper.GetString("spider.path"),
s.Spider.Name,
)
//爬虫文件有变化,先删除本地文件
if err := os.RemoveAll(path); err != nil {
log.Errorf("remove spider files error: %s, path: %s", err.Error(), path)
debug.PrintStack()
}
}
// 检测是否已经下载中
func (s *SpiderSync) CheckDownLoading(spiderId string, fileId string) (bool, string) {
key := s.GetLockDownloadKey(spiderId)
if _, err := database.RedisClient.HGet("spider", key); err == nil {
return true, key
}
return false, key
}
// 下载爬虫
func (s *SpiderSync) Download() {
spiderId := s.Spider.Id.Hex()
fileId := s.Spider.FileId.Hex()
isDownloading, key := s.CheckDownLoading(spiderId, fileId)
if isDownloading {
return
} else {
_ = database.RedisClient.HSet("spider", key, key)
}
session, gf := database.GetGridFs("files")
defer session.Close()
f, err := gf.OpenId(bson.ObjectIdHex(fileId))
defer utils.Close(f)
if err != nil {
log.Errorf("open file id: " + fileId + ", spider id:" + spiderId + ", error: " + err.Error())
debug.PrintStack()
return
}
// 生成唯一ID
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return
}
}
// 创建临时文件
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile := utils.OpenFile(tmpFilePath)
defer utils.Close(tmpFile)
// 将该文件写入临时文件
if _, err := io.Copy(tmpFile, f); err != nil {
log.Errorf("copy file error: %s, file_id: %s", err.Error(), f.Id())
debug.PrintStack()
return
}
// 解压缩临时文件到目标文件夹
dstPath := filepath.Join(
viper.GetString("spider.path"),
s.Spider.Name,
)
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 关闭临时文件
if err := tmpFile.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 删除临时文件
if err := os.Remove(tmpFilePath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
_ = database.RedisClient.HDel("spider", key)
}

View File

@@ -0,0 +1,53 @@
package spider_handler
import (
"crawlab/config"
"crawlab/database"
"crawlab/model"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"runtime/debug"
"testing"
)
var s SpiderSync
func init() {
if err := config.InitConfig("../../conf/config.yml"); err != nil {
log.Fatal("Init config failed")
}
log.Infof("初始化配置成功")
// 初始化Mongodb数据库
if err := database.InitMongo(); err != nil {
log.Error("init mongodb error:" + err.Error())
debug.PrintStack()
panic(err)
}
log.Info("初始化Mongodb数据库成功")
// 初始化Redis数据库
if err := database.InitRedis(); err != nil {
log.Error("init redis error:" + err.Error())
debug.PrintStack()
panic(err)
}
log.Info("初始化Redis数据库成功")
s = SpiderSync{
Spider: model.Spider{
Id: bson.ObjectIdHex("5d8d8326bc3c4f000186e5df"),
Name: "scrapy-pre_sale",
FileId: bson.ObjectIdHex("5d8d8326bc3c4f000186e5db"),
Src: "/opt/crawlab/spiders/scrapy-pre_sale",
},
}
}
func TestSpiderSync_CreateMd5File(t *testing.T) {
s.CreateMd5File("this is md5")
}
func TestSpiderSync_Download(t *testing.T) {
s.Download()
}

View File

@@ -3,17 +3,17 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
)
var SystemInfoChanMap = utils.NewChanMap()
func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
func GetRemoteSystemInfo(id string) (sysInfo entity.SystemInfo, err error) {
// 发送消息
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: id,
}
@@ -21,7 +21,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
// 序列化
msgBytes, _ := json.Marshal(&msg)
if _, err := database.RedisClient.Publish("nodes:"+id, utils.BytesToString(msgBytes)); err != nil {
return model.SystemInfo{}, err
return entity.SystemInfo{}, err
}
// 通道
@@ -38,7 +38,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
return sysInfo, nil
}
func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
func GetSystemInfo(id string) (sysInfo entity.SystemInfo, err error) {
if IsMasterNode(id) {
sysInfo, err = model.GetLocalSystemInfo()
} else {

View File

@@ -3,9 +3,9 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
"errors"
@@ -18,6 +18,7 @@ import (
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"
)
@@ -99,73 +100,143 @@ func AssignTask(task model.Task) error {
return nil
}
// 设置环境变量
func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd {
// 默认环境变量
cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+taskId)
cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol)
cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0")
cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8")
cmd.Env = append(cmd.Env, "TZ=Asia/Shanghai")
//任务环境变量
for _, env := range envs {
cmd.Env = append(cmd.Env, env.Name+"="+env.Value)
}
// TODO 全局环境变量
return cmd
}
func SetLogConfig(cmd *exec.Cmd, path string) error {
fLog, err := os.Create(path)
if err != nil {
log.Errorf("create task log file error: %s", path)
debug.PrintStack()
return err
}
cmd.Stdout = fLog
cmd.Stderr = fLog
return nil
}
func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) {
// 传入信号,此处阻塞
signal := <-ch
log.Infof("process received signal: %s", signal)
if signal == constants.TaskCancel && cmd.Process != nil {
// 取消进程
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
log.Errorf("process kill error: %s", err.Error())
debug.PrintStack()
t.Error = "kill process error: " + err.Error()
t.Status = constants.StatusError
} else {
t.Error = "user kill the process ..."
t.Status = constants.StatusCancelled
}
} else {
// 保存任务
t.Status = constants.StatusFinished
}
t.FinishTs = time.Now()
_ = t.Save()
}
func StartTaskProcess(cmd *exec.Cmd, t model.Task) error {
if err := cmd.Start(); err != nil {
log.Errorf("start spider error:{}", err.Error())
debug.PrintStack()
t.Error = "start task error: " + err.Error()
t.Status = constants.StatusError
t.FinishTs = time.Now()
_ = t.Save()
return err
}
return nil
}
func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error {
if err := cmd.Wait(); err != nil {
log.Errorf("wait process finish error: %s", err.Error())
debug.PrintStack()
if exitError, ok := err.(*exec.ExitError); ok {
exitCode := exitError.ExitCode()
log.Errorf("exit error, exit code: %d", exitCode)
// 非kill 的错误类型
if exitCode != -1 {
// 非手动kill保存为错误状态
t.Error = err.Error()
t.FinishTs = time.Now()
t.Status = constants.StatusError
_ = t.Save()
}
}
return err
}
return nil
}
// 执行shell命令
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) {
log.Infof("cwd: " + cwd)
log.Infof("cmd: " + cmdStr)
log.Infof("cwd: %s", cwd)
log.Infof("cmd: %s", cmdStr)
// 生成执行命令
var cmd *exec.Cmd
if runtime.GOOS == constants.Windows {
cmd = exec.Command("cmd", "/C", cmdStr)
} else {
cmd = exec.Command("")
cmd = exec.Command("sh", "-c", cmdStr)
}
// 工作目录
cmd.Dir = cwd
// 指定stdout, stderr日志
fLog, err := os.Create(t.LogPath)
if err != nil {
HandleTaskError(t, err)
// 日志
if err := SetLogConfig(cmd, t.LogPath); err != nil {
return err
}
defer fLog.Close()
cmd.Stdout = fLog
cmd.Stderr = fLog
// 添加默认环境变量
cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id)
cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col)
// 添加任务环境变量
for _, env := range s.Envs {
cmd.Env = append(cmd.Env, env.Name+"="+env.Value)
}
// 环境变量配置
cmd = SetEnv(cmd, s.Envs, t.Id, s.Col)
// 起一个goroutine来监控进程
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go func() {
// 传入信号,此处阻塞
signal := <-ch
if signal == constants.TaskCancel {
// 取消进程
if err := cmd.Process.Kill(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
t.Status = constants.StatusCancelled
}
go FinishOrCancelTask(ch, cmd, t)
// 保存任务
t.FinishTs = time.Now()
if err := t.Save(); err != nil {
log.Infof(err.Error())
debug.PrintStack()
return
}
}()
// kill的时候可以kill所有的子进程
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// 开始执行
if err := cmd.Run(); err != nil {
HandleTaskError(t, err)
// 启动进程
if err := StartTaskProcess(cmd, t); err != nil {
return err
}
// 同步等待进程完成
if err := WaitTaskProcess(cmd, t); err != nil {
return err
}
ch <- constants.TaskFinish
return nil
}
@@ -177,6 +248,7 @@ func MakeLogDir(t model.Task) (fileDir string, err error) {
// 如果日志目录不存在,生成该目录
if !utils.Exists(fileDir) {
if err := os.MkdirAll(fileDir, 0777); err != nil {
log.Errorf("execute task, make log dir error: %s", err.Error())
debug.PrintStack()
return "", err
}
@@ -239,84 +311,57 @@ func ExecuteTask(id int) {
tic := time.Now()
// 获取当前节点
node, err := GetCurrentNode()
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
log.Errorf("execute task get current node error: %s", err.Error())
debug.PrintStack()
return
}
// 公共队列
queuePub := "tasks:public"
// 节点队列
queueCur := "tasks:node:" + node.Id.Hex()
// 节点队列任务
var msg string
msg, err = database.RedisClient.LPop(queueCur)
if err != nil {
if msg == "" {
// 节点队列没有任务,获取公共队列任务
msg, err = database.RedisClient.LPop(queuePub)
if err != nil {
if msg == "" {
// 公共队列没有任务
log.Debugf(GetWorkerPrefix(id) + "没有任务...")
return
} else {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
}
} else {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
if msg, err = database.RedisClient.LPop(queueCur); err != nil {
// 节点队列没有任务,获取公共队列任务
queuePub := "tasks:public"
if msg, err = database.RedisClient.LPop(queuePub); err != nil {
}
}
if msg == "" {
return
}
// 反序列化
tMsg := TaskMessage{}
if err := json.Unmarshal([]byte(msg), &tMsg); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
log.Errorf("json string to struct error: %s", err.Error())
return
}
// 获取任务
t, err := model.GetTask(tMsg.Id)
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
log.Errorf("execute task, get task error: %s", err.Error())
return
}
// 获取爬虫
spider, err := t.GetSpider()
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
log.Errorf("execute task, get spider error: %s", err.Error())
return
}
// 创建日志目录
fileDir, err := MakeLogDir(t)
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
var fileDir string
if fileDir, err = MakeLogDir(t); err != nil {
return
}
// 获取日志文件路径
t.LogPath = GetLogFilePaths(fileDir)
// 创建日志目录文件夹
fileStdoutDir := filepath.Dir(t.LogPath)
if !utils.Exists(fileStdoutDir) {
if err := os.MkdirAll(fileStdoutDir, os.ModePerm); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
}
// 工作目录
cwd := filepath.Join(
viper.GetString("spider.path"),
@@ -401,39 +446,37 @@ func ExecuteTask(id int) {
func GetTaskLog(id string) (logStr string, err error) {
task, err := model.GetTask(id)
if err != nil {
return "", err
return
}
logStr = ""
if IsMasterNode(task.NodeId.Hex()) {
// 若为主节点,获取本机日志
logBytes, err := model.GetLocalLog(task.LogPath)
logStr = utils.BytesToString(logBytes)
if err != nil {
log.Errorf(err.Error())
logStr = err.Error()
// return "", err
} else {
logStr = utils.BytesToString(logBytes)
}
} else {
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
if err != nil {
log.Errorf(err.Error())
return "", err
}
return logStr, err
}
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
if err != nil {
log.Errorf(err.Error())
return logStr, nil
}
return logStr, err
}
func CancelTask(id string) (err error) {
// 获取任务
task, err := model.GetTask(id)
if err != nil {
log.Errorf("task not found, task id : %s, error: %s", id, err.Error())
debug.PrintStack()
return err
}
@@ -443,24 +486,36 @@ func CancelTask(id string) (err error) {
}
// 获取当前节点(默认当前节点为主节点)
node, err := GetCurrentNode()
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf("get current node error: %s", err.Error())
debug.PrintStack()
return err
}
log.Infof("current node id is: %s", node.Id.Hex())
log.Infof("task node id is: %s", task.NodeId.Hex())
if node.Id == task.NodeId {
// 任务节点为主节点
// 获取任务执行频道
ch := utils.TaskExecChanMap.ChanBlocked(id)
// 发出取消进程信号
ch <- constants.TaskCancel
if ch != nil {
// 发出取消进程信号
ch <- constants.TaskCancel
} else {
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
log.Errorf("update task to abnormal : {}", err.Error())
debug.PrintStack()
return err
}
}
} else {
// 任务节点为工作节点
// 序列化消息
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeCancelTask,
TaskId: id,
}

View File

@@ -17,9 +17,7 @@ func InitUserService() error {
Password: utils.EncryptPassword("admin"),
Role: constants.RoleAdmin,
}
if err := adminUser.Add(); err != nil {
// pass
}
_ = adminUser.Add()
return nil
}
func MakeToken(user *model.User) (tokenStr string, err error) {

View File

@@ -2,6 +2,7 @@ package utils
import (
"archive/zip"
"bufio"
"github.com/apex/log"
"io"
"os"
@@ -9,14 +10,54 @@ import (
"runtime/debug"
)
// 删除文件
func RemoveFiles(path string) {
if err := os.RemoveAll(path); err != nil {
log.Errorf("remove files error: %s, path: %s", err.Error(), path)
debug.PrintStack()
}
}
// 读取文件一行
func ReadFileOneLine(fileName string) string {
file := OpenFile(fileName)
defer Close(file)
buf := bufio.NewReader(file)
line, err := buf.ReadString('\n')
if err != nil {
log.Errorf("read file error: %s", err.Error())
return ""
}
return line
}
// 创建文件
func OpenFile(fileName string) *os.File {
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
log.Errorf("create file error: %s, file_name: %s", err.Error(), fileName)
debug.PrintStack()
return nil
}
return file
}
// 创建文件夹
func CreateFilePath(filePath string) {
if !Exists(filePath) {
if err := os.MkdirAll(filePath, os.ModePerm); err != nil {
log.Errorf("create file error: %s, file_path: %s", err.Error(), filePath)
debug.PrintStack()
}
}
}
// 判断所给路径文件/文件夹是否存在
func Exists(path string) bool {
_, err := os.Stat(path) //os.Stat获取文件信息
if err != nil {
if os.IsExist(err) {
return true
}
return false
return os.IsExist(err)
}
return true
}
@@ -44,7 +85,7 @@ func DeCompressByPath(tarFile, dest string) error {
if err != nil {
return err
}
defer srcFile.Close()
defer Close(srcFile)
return DeCompress(srcFile, dest)
}
@@ -68,7 +109,7 @@ func DeCompress(srcFile *os.File, dstPath string) error {
debug.PrintStack()
return err
}
defer zipFile.Close()
defer Close(zipFile)
// 遍历zip内所有文件和目录
for _, innerFile := range zipFile.File {
@@ -112,7 +153,7 @@ func DeCompress(srcFile *os.File, dstPath string) error {
debug.PrintStack()
continue
}
defer newFile.Close()
defer Close(newFile)
// 拷贝该文件到新文件中
if _, err := io.Copy(newFile, srcFile); err != nil {
@@ -140,9 +181,9 @@ func DeCompress(srcFile *os.File, dstPath string) error {
//dest 压缩文件存放地址
func Compress(files []*os.File, dest string) error {
d, _ := os.Create(dest)
defer d.Close()
defer Close(d)
w := zip.NewWriter(d)
defer w.Close()
defer Close(w)
for _, file := range files {
err := _Compress(file, "", w)
if err != nil {
@@ -190,7 +231,7 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error {
return err
}
_, err = io.Copy(writer, file)
file.Close()
Close(file)
if err != nil {
debug.PrintStack()
return err

View File

@@ -60,7 +60,7 @@ func TestCompress(t *testing.T) {
So(er, ShouldEqual, nil)
})
})
os.RemoveAll("testCompress")
_ = os.RemoveAll("testCompress")
}
func Zip(zipFile string, fileList []string) error {
@@ -69,16 +69,11 @@ func Zip(zipFile string, fileList []string) error {
if err != nil {
log.Fatal()
}
defer fw.Close()
defer Close(fw)
// 实例化新的 zip.Writer
zw := zip.NewWriter(fw)
defer func() {
// 检测一下是否成功关闭
if err := zw.Close(); err != nil {
log.Fatalln(err)
}
}()
defer Close(zw)
for _, fileName := range fileList {
fr, err := os.Open(fileName)
@@ -91,6 +86,9 @@ func Zip(zipFile string, fileList []string) error {
}
// 写入文件的头信息
fh, err := zip.FileInfoHeader(fi)
if err != nil {
return err
}
w, err := zw.CreateHeader(fh)
if err != nil {
return err
@@ -106,6 +104,10 @@ func Zip(zipFile string, fileList []string) error {
func TestDeCompress(t *testing.T) {
err := os.Mkdir("testDeCompress", os.ModePerm)
if err != nil {
t.Error(err)
}
err = Zip("demo.zip", []string{})
if err != nil {
t.Error("create zip file failed")
@@ -121,7 +123,7 @@ func TestDeCompress(t *testing.T) {
err := DeCompress(tmpFile, dstPath)
So(err, ShouldEqual, nil)
})
os.RemoveAll("testDeCompress")
os.Remove("demo.zip")
_ = os.RemoveAll("testDeCompress")
_ = os.Remove("demo.zip")
}

View File

@@ -1,7 +1,42 @@
package utils
import "unsafe"
import (
"crawlab/entity"
"encoding/json"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
"io"
"runtime/debug"
"unsafe"
)
func BytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
func GetJson(message entity.NodeMessage) string {
msgBytes, err := json.Marshal(&message)
if err != nil {
log.Errorf("node message to json error: %s", err.Error())
debug.PrintStack()
return ""
}
return BytesToString(msgBytes)
}
func GetMessage(message redis.Message) *entity.NodeMessage {
msg := entity.NodeMessage{}
if err := json.Unmarshal(message.Data, &msg); err != nil {
log.Errorf("message byte to object error: %s", err.Error())
debug.PrintStack()
return nil
}
return &msg
}
func Close(c io.Closer) {
err := c.Close()
if err != nil {
log.WithError(err).Error("关闭资源文件失败。")
}
}

View File

@@ -12,15 +12,16 @@ func IsObjectIdNull(id bson.ObjectId) bool {
}
func InterfaceToString(value interface{}) string {
switch value.(type) {
switch realValue := value.(type) {
case bson.ObjectId:
return value.(bson.ObjectId).Hex()
return realValue.Hex()
case string:
return value.(string)
return realValue
case int:
return strconv.Itoa(value.(int))
return strconv.Itoa(realValue)
case time.Time:
return value.(time.Time).String()
return realValue.String()
default:
return ""
}
return ""
}

View File

@@ -8,7 +8,7 @@ import (
func EncryptPassword(str string) string {
w := md5.New()
io.WriteString(w, str)
_, _ = io.WriteString(w, str)
md5str := fmt.Sprintf("%x", w.Sum(nil))
return md5str
}

View File

@@ -6,7 +6,7 @@ then
:
else
jspath=`ls /app/dist/js/app.*.js`
sed -i "s?localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath}
sed -i "s?http://localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath}
fi
# replace base url

View File

@@ -1,9 +1,9 @@
{
"name": "crawlab",
"version": "0.2.3",
"version": "0.3.5",
"private": true,
"scripts": {
"serve": "vue-cli-service serve --ip=0.0.0.0",
"serve": "vue-cli-service serve --ip=0.0.0.0 --mode=development",
"serve:prod": "vue-cli-service serve --mode=production --ip=0.0.0.0",
"config": "vue ui",
"build:dev": "vue-cli-service build --mode development",

View File

@@ -9,9 +9,8 @@
<el-form label-width="80px">
<el-form-item :label="$t('Node')">
<el-select v-model="nodeId">
<el-option value="" :label="$t('All Nodes')"/>
<el-option
v-for="op in $store.state.node.nodeList"
v-for="op in nodeList"
:key="op._id"
:value="op._id"
:disabled="op.status !== 'online'"
@@ -31,6 +30,7 @@
</template>
<script>
import request from '../../api/request'
export default {
name: 'CrawlConfirmDialog',
props: {
@@ -46,7 +46,8 @@ export default {
data () {
return {
nodeId: '',
param: ''
param: '',
nodeList: []
}
},
methods: {
@@ -61,6 +62,20 @@ export default {
this.$emit('close')
this.$st.sendEv('爬虫', '运行')
}
},
created () {
// 节点列表
request.get('/nodes', {}).then(response => {
this.nodeList = response.data.data.map(d => {
d.systemInfo = {
os: '',
arch: '',
num_cpu: '',
executables: []
}
return d
})
})
}
}
</script>

View File

@@ -151,7 +151,7 @@ export default {
}
},
mounted () {
if (!this.spiderList || !this.spiderList.length) this.$store.dispatch('spider/getSpiderList')
// if (!this.spiderList || !this.spiderList.length) this.$store.dispatch('spider/getSpiderList')
if (!this.nodeList || !this.nodeList.length) this.$store.dispatch('node/getNodeList')
}
}

View File

@@ -18,10 +18,10 @@
<el-form-item :label="$t('Spider Name')">
<el-input v-model="spiderForm.display_name" :placeholder="$t('Spider Name')" :disabled="isView"></el-input>
</el-form-item>
<el-form-item v-if="isCustomized" :label="$t('Source Folder')">
<el-form-item :label="$t('Source Folder')">
<el-input v-model="spiderForm.src" :placeholder="$t('Source Folder')" disabled></el-input>
</el-form-item>
<el-form-item v-if="isCustomized" :label="$t('Execute Command')" prop="cmd" required :inline-message="true">
<el-form-item :label="$t('Execute Command')" prop="cmd" required :inline-message="true">
<el-input v-model="spiderForm.cmd" :placeholder="$t('Execute Command')"
:disabled="isView"></el-input>
</el-form-item>
@@ -39,10 +39,11 @@
</el-autocomplete>
</el-form-item>
<el-form-item :label="$t('Spider Type')">
<el-select v-model="spiderForm.type" :placeholder="$t('Spider Type')" :disabled="true" clearable>
<el-option value="configurable" :label="$t('Configurable')"></el-option>
<el-option value="customized" :label="$t('Customized')"></el-option>
</el-select>
<!--<el-select v-model="spiderForm.type" :placeholder="$t('Spider Type')" :disabled="true" clearable>-->
<!--<el-option value="configurable" :label="$t('Configurable')"></el-option>-->
<!--<el-option value="customized" :label="$t('Customized')"></el-option>-->
<!--</el-select>-->
<el-input v-model="spiderForm.type" placeholder="爬虫类型" clearable/>
</el-form-item>
<el-form-item :label="$t('Remark')">
<el-input v-model="spiderForm.remark"/>
@@ -102,16 +103,7 @@ export default {
'spiderForm'
]),
isShowRun () {
if (this.isCustomized) {
// customized spider
return !!this.spiderForm.cmd
} else {
// configurable spider
return !!this.spiderForm.fields
}
},
isCustomized () {
return this.spiderForm.type === 'customized'
return !!this.spiderForm.cmd
}
},
methods: {

View File

@@ -4,6 +4,8 @@ const state = {
// list of spiders
spiderList: [],
spiderTotal: 0,
// active spider data
spiderForm: {},
@@ -38,6 +40,9 @@ const state = {
const getters = {}
const mutations = {
SET_SPIDER_TOTAL (state, value) {
state.spiderTotal = value
},
SET_SPIDER_FORM (state, value) {
state.spiderForm = value
},
@@ -71,14 +76,11 @@ const mutations = {
}
const actions = {
getSpiderList ({ state, commit }) {
let params = {}
if (state.filterSite) {
params.site = state.filterSite
}
getSpiderList ({ state, commit }, params = {}) {
return request.get('/spiders', params)
.then(response => {
commit('SET_SPIDER_LIST', response.data.data)
commit('SET_SPIDER_LIST', response.data.data.list)
commit('SET_SPIDER_TOTAL', response.data.data.total)
})
},
editSpider ({ state, dispatch }) {

View File

@@ -139,7 +139,7 @@ const actions = {
cancelTask ({ state, dispatch }, id) {
return request.post(`/tasks/${id}/cancel`)
.then(() => {
dispatch('getTaskData')
dispatch('getTaskData', id)
})
}
}

View File

@@ -8,6 +8,9 @@
<i class="el-icon-arrow-down el-icon--right"></i>
</span>
<el-dropdown-menu slot="dropdown" class="user-dropdown">
<el-dropdown-item>
<span style="display:block;">v0.3.5</span>
</el-dropdown-item>
<el-dropdown-item>
<span style="display:block;" @click="logout">{{$t('Logout')}}</span>
</el-dropdown-item>

View File

@@ -59,7 +59,7 @@ export default {
},
created () {
// get the list of the spiders
this.$store.dispatch('spider/getSpiderList')
// this.$store.dispatch('spider/getSpiderList')
// get spider basic info
this.$store.dispatch('spider/getSpiderData', this.$route.params.id)

View File

@@ -195,7 +195,7 @@ export default {
this.dialogVisible = true
},
onRefresh () {
this.$store.dispatch('spider/getSpiderList')
// this.$store.dispatch('spider/getSpiderList')
},
onSubmit () {
const vm = this
@@ -257,7 +257,7 @@ export default {
}
},
created () {
this.$store.dispatch('spider/getSpiderList')
// this.$store.dispatch('spider/getSpiderList')
}
}
</script>

View File

@@ -14,9 +14,9 @@
<el-form-item :label="$t('Schedule Name')" prop="name" required>
<el-input v-model="scheduleForm.name" :placeholder="$t('Schedule Name')"></el-input>
</el-form-item>
<el-form-item :label="$t('Node')" prop="node_id">
<el-form-item :label="$t('Node')" prop="node_id" required>
<el-select v-model="scheduleForm.node_id">
<el-option :label="$t('All Nodes')" value="000000000000000000000000"></el-option>
<!--<el-option :label="$t('All Nodes')" value="000000000000000000000000"></el-option>-->
<el-option
v-for="op in nodeList"
:key="op._id"
@@ -38,21 +38,22 @@
</el-option>
</el-select>
</el-form-item>
<el-form-item :label="$t('schedules.cron')" prop="cron" :rules="cronRules" required>
<template slot="label">
<el-tooltip :content="$t('schedules.cron_format')"
placement="top">
<span>
{{$t('schedules.cron')}}
<i class="fa fa-exclamation-circle"></i>
</span>
</el-tooltip>
</template>
<el-input style="width:calc(100% - 100px);padding-right:10px"
<!--:rules="cronRules"-->
<el-form-item :label="$t('schedules.cron')" prop="cron" required>
<!--<template slot="label">-->
<!--<el-tooltip :content="$t('schedules.cron_format')"-->
<!--placement="top">-->
<!--<span>-->
<!--{{$t('schedules.cron')}}-->
<!--<i class="fa fa-exclamation-circle"></i>-->
<!--</span>-->
<!--</el-tooltip>-->
<!--</template>-->
<el-input style="padding-right:10px"
v-model="scheduleForm.cron"
:placeholder="$t('schedules.cron')">
</el-input>
<el-button size="small" style="width:100px" type="primary" @click="onShowCronDialog">{{$t('schedules.add_cron')}}</el-button>
<!--<el-button size="small" style="width:100px" type="primary" @click="onShowCronDialog">{{$t('schedules.add_cron')}}</el-button>-->
</el-form-item>
<el-form-item :label="$t('Execute Command')" prop="params">
<el-input v-model="spider.cmd"
@@ -69,6 +70,7 @@
:placeholder="$t('Schedule Description')"></el-input>
</el-form-item>
</el-form>
<!--取消保存-->
<span slot="footer" class="dialog-footer">
<el-button size="small" @click="onCancel">{{$t('Cancel')}}</el-button>
<el-button size="small" type="primary" @click="onAddSubmit">{{$t('Submit')}}</el-button>
@@ -76,9 +78,9 @@
</el-dialog>
<!--cron generation popup-->
<el-dialog title="生成 Cron" :visible.sync="showCron">
<vcrontab @hide="showCron=false" @fill="onCrontabFill" :expression="expression"></vcrontab>
</el-dialog>
<!--<el-dialog title="生成 Cron" :visible.sync="showCron">-->
<!--<vcrontab @hide="showCron=false" @fill="onCrontabFill" :expression="expression"></vcrontab>-->
<!--</el-dialog>-->
<el-card style="border-radius: 0">
<!--filter-->
@@ -131,28 +133,15 @@
</template>
<script>
import vcrontab from 'vcrontab'
// import vcrontab from 'vcrontab'
import request from '../../api/request'
import {
mapState
} from 'vuex'
export default {
name: 'ScheduleList',
components: { vcrontab },
data () {
const cronValidator = (rule, value, callback) => {
let patArr = []
for (let i = 0; i < 6; i++) {
patArr.push('[/*,0-9-]+')
}
const pat = '^' + patArr.join(' ') + '( [/*,0-9-]+)?' + '$'
if (!value) {
callback(new Error('cron cannot be empty'))
} else if (!value.match(pat)) {
callback(new Error('cron format is invalid'))
}
callback()
}
return {
columns: [
{ name: 'name', label: 'Name', width: '180' },
@@ -165,11 +154,10 @@ export default {
isEdit: false,
dialogTitle: '',
dialogVisible: false,
cronRules: [
{ validator: cronValidator, trigger: 'blur' }
],
showCron: false,
expression: ''
expression: '',
spiderList: [],
nodeList: []
}
},
computed: {
@@ -177,12 +165,6 @@ export default {
'scheduleList',
'scheduleForm'
]),
...mapState('spider', [
'spiderList'
]),
...mapState('node', [
'nodeList'
]),
filteredTableData () {
return this.scheduleList
},
@@ -211,19 +193,25 @@ export default {
onAddSubmit () {
this.$refs.scheduleForm.validate(res => {
if (res) {
let action
if (this.isEdit) {
action = 'editSchedule'
} else {
action = 'addSchedule'
}
this.$store.dispatch('schedule/' + action, this.scheduleForm._id)
.then(() => {
request.post(`/schedules/${this.scheduleForm._id}`, this.scheduleForm).then(response => {
if (response.data.error) {
this.$message.error(response.data.error)
return
}
this.dialogVisible = false
setTimeout(() => {
this.$store.dispatch('schedule/getScheduleList')
}, 100)
this.$store.dispatch('schedule/getScheduleList')
})
} else {
request.put('/schedules', this.scheduleForm).then(response => {
if (response.data.error) {
this.$message.error(response.data.error)
return
}
this.dialogVisible = false
this.$store.dispatch('schedule/getScheduleList')
})
}
}
})
this.$st.sendEv('定时任务', '提交')
@@ -269,8 +257,25 @@ export default {
},
created () {
this.$store.dispatch('schedule/getScheduleList')
this.$store.dispatch('spider/getSpiderList')
this.$store.dispatch('node/getNodeList')
// 节点列表
request.get('/nodes', {}).then(response => {
this.nodeList = response.data.data.map(d => {
d.systemInfo = {
os: '',
arch: '',
num_cpu: '',
executables: []
}
return d
})
})
// 爬虫列表
request.get('/spiders', {})
.then(response => {
this.spiderList = response.data.data.list
})
}
}
</script>

View File

@@ -16,7 +16,7 @@
<el-tab-pane v-if="isConfigurable" :label="$t('Config')" name="配置">
<config-list/>
</el-tab-pane>
<el-tab-pane v-if="isCustomized" :label="$t('Files')" name="files">
<el-tab-pane :label="$t('Files')" name="files">
<file-list/>
</el-tab-pane>
<el-tab-pane :label="$t('Environment')" name="environment">
@@ -87,7 +87,7 @@ export default {
},
created () {
// get the list of the spiders
this.$store.dispatch('spider/getSpiderList')
// this.$store.dispatch('spider/getSpiderList')
// get spider basic info
this.$store.dispatch('spider/getSpiderData', this.$route.params.id)

View File

@@ -108,19 +108,20 @@
<el-card style="border-radius: 0">
<!--filter-->
<div class="filter">
<!--<el-input prefix-icon="el-icon-search"-->
<!--:placeholder="$t('Search')"-->
<!--class="filter-search"-->
<!--v-model="filter.keyword"-->
<!--@change="onSearch">-->
<!--</el-input>-->
<div class="left">
<el-autocomplete size="small" v-model="filterSite"
:placeholder="$t('Site')"
clearable
:fetch-suggestions="fetchSiteSuggestions"
@select="onSiteSelect">
</el-autocomplete>
<el-form :inline="true">
<el-form-item>
<el-select clearable @change="onSpiderTypeChange" placeholder="爬虫类型" size="small" v-model="filter.type">
<el-option v-for="item in types" :value="item.type" :key="item.type"
:label="item.type === 'customized'? '自定义':item.type "/>
</el-select>
</el-form-item>
<el-form-item>
<el-input clearable @keyup.enter.native="onSearch" size="small" placeholder="名称" v-model="filter.keyword">
<i slot="suffix" class="el-input__icon el-icon-search"></i>
</el-input>
</el-form-item>
</el-form>
</div>
<div class="right">
<el-button size="small" v-if="false" type="primary" icon="fa fa-download" @click="openImportDialog">
@@ -143,7 +144,7 @@
<!--./filter-->
<!--table list-->
<el-table :data="filteredTableData"
<el-table :data="spiderList"
class="table"
:header-cell-style="{background:'rgb(48, 65, 86)',color:'white'}"
border
@@ -156,8 +157,7 @@
align="left"
:width="col.width">
<template slot-scope="scope">
<el-tag type="success" v-if="scope.row.type === 'configurable'">{{$t('Configurable')}}</el-tag>
<el-tag type="primary" v-else-if="scope.row.type === 'customized'">{{$t('Customized')}}</el-tag>
{{scope.row.type === 'customized' ? '自定义' : scope.row.type}}
</template>
</el-table-column>
<el-table-column v-else-if="col.name === 'last_5_errors'"
@@ -226,13 +226,13 @@
</el-table>
<div class="pagination">
<el-pagination
@current-change="onPageChange"
@size-change="onPageChange"
@current-change="onPageNumChange"
@size-change="onPageSizeChange"
:current-page.sync="pagination.pageNum"
:page-sizes="[10, 20, 50, 100]"
:page-size.sync="pagination.pageSize"
layout="sizes, prev, pager, next"
:total="spiderList.length">
:total="spiderTotal">
</el-pagination>
</div>
<!--./table list-->
@@ -248,7 +248,7 @@ import {
import dayjs from 'dayjs'
import CrawlConfirmDialog from '../../components/Common/CrawlConfirmDialog'
import StatusTag from '../../components/Status/StatusTag'
import request from '../../api/request'
export default {
name: 'SpiderList',
components: {
@@ -258,7 +258,7 @@ export default {
data () {
return {
pagination: {
pageNum: 0,
pageNum: 1,
pageSize: 10
},
importLoading: false,
@@ -271,21 +271,18 @@ export default {
crawlConfirmDialogVisible: false,
activeSpiderId: undefined,
filter: {
keyword: ''
keyword: '',
type: ''
},
types: [],
// tableData,
columns: [
{ name: 'name', label: 'Name', width: '160', align: 'left' },
// { name: 'site_name', label: 'Site', width: '140', align: 'left' },
{ name: 'display_name', label: 'Name', width: '160', align: 'left' },
{ name: 'type', label: 'Spider Type', width: '120' },
// { name: 'cmd', label: 'Command Line', width: '200' },
{ name: 'last_status', label: 'Last Status', width: '120' },
{ name: 'last_run_ts', label: 'Last Run', width: '140' },
{ name: 'create_ts', label: 'Create Time', width: '140' },
{ name: 'update_ts', label: 'Update Time', width: '140' },
// { name: 'update_ts', label: 'Update Time', width: '140' },
{ name: 'remark', label: 'Remark', width: '140' }
// { name: 'last_7d_tasks', label: 'Last 7-Day Tasks', width: '80' },
// { name: 'last_5_errors', label: 'Last 5-Run Errors', width: '80' }
],
spiderFormRules: {
name: [{ required: true, message: 'Required Field', trigger: 'change' }]
@@ -297,45 +294,28 @@ export default {
...mapState('spider', [
'importForm',
'spiderList',
'spiderForm'
'spiderForm',
'spiderTotal'
]),
...mapGetters('user', [
'token'
]),
filteredTableData () {
return this.spiderList
.filter(d => {
if (this.filterSite) {
return d.site === this.filterSite
}
return true
})
.filter((d, index) => {
return (this.pagination.pageSize * (this.pagination.pageNum - 1)) <= index && (index < this.pagination.pageSize * this.pagination.pageNum)
})
// .filter(d => {
// if (!this.filter.keyword) return true
// for (let i = 0; i < this.columns.length; i++) {
// const colName = this.columns[i].name
// if (d[colName] && d[colName].toLowerCase().indexOf(this.filter.keyword.toLowerCase()) > -1) {
// return true
// }
// }
// return false
// })
},
filterSite: {
get () {
return this.$store.state.spider.filterSite
},
set (value) {
this.$store.commit('spider/SET_FILTER_SITE', value)
}
}
])
},
methods: {
onSearch (value) {
console.log(value)
onSpiderTypeChange (val) {
this.filter.type = val
this.getList()
},
onPageSizeChange (val) {
this.pagination.pageSize = val
this.getList()
},
onPageNumChange (val) {
this.pagination.pageNum = val
this.getList()
},
onSearch () {
this.getList()
},
onAdd () {
// this.addDialogVisible = true
@@ -353,7 +333,7 @@ export default {
this.$st.sendEv('爬虫', '添加爬虫-自定义爬虫')
},
onRefresh () {
this.$store.dispatch('spider/getSpiderList')
this.getList()
this.$st.sendEv('爬虫', '刷新')
},
onSubmit () {
@@ -376,9 +356,6 @@ export default {
this.$store.commit('spider/SET_SPIDER_FORM', {})
this.dialogVisible = false
},
onAddCancel () {
this.addDialogVisible = false
},
onDialogClose () {
this.$store.commit('spider/SET_SPIDER_FORM', {})
this.dialogVisible = false
@@ -422,9 +399,6 @@ export default {
this.$router.push('/spiders/' + row._id)
this.$st.sendEv('爬虫', '查看')
},
onPageChange () {
this.$store.dispatch('spider/getSpiderList')
},
onImport () {
this.$refs.importForm.validate(valid => {
if (valid) {
@@ -433,7 +407,7 @@ export default {
this.$store.dispatch('spider/importGithub')
.then(response => {
this.$message.success('Import repo successfully')
this.$store.dispatch('spider/getSpiderList')
this.getList()
})
.catch(response => {
this.$message.error(response.data.error)
@@ -450,12 +424,10 @@ export default {
this.dialogVisible = true
},
isShowRun (row) {
if (this.isCustomized(row)) {
// customized spider
return !!row.cmd
if (row.cmd) {
return true
} else {
// configurable spider
return !!row.fields
return false
}
},
isCustomized (row) {
@@ -501,7 +473,7 @@ export default {
// fetch spider list
setTimeout(() => {
this.$store.dispatch('spider/getSpiderList')
this.getList()
}, 500)
// close popup
@@ -515,14 +487,26 @@ export default {
if (column.label !== this.$t('Action')) {
this.onView(row)
}
},
getList () {
let params = {
pageNum: this.pagination.pageNum,
pageSize: this.pagination.pageSize,
keyword: this.filter.keyword,
type: this.filter.type
}
this.$store.dispatch('spider/getSpiderList', params)
},
getTypes () {
request.get(`/spider/types`).then(resp => {
this.types = resp.data.data
})
}
},
created () {
// take site from params to filter
this.$store.commit('spider/SET_FILTER_SITE', this.$route.params.domain)
this.getTypes()
// fetch spider list
this.$store.dispatch('spider/getSpiderList')
this.getList()
},
mounted () {
}

View File

@@ -35,6 +35,7 @@ import {
import TaskOverview from '../../components/Overview/TaskOverview'
import GeneralTableView from '../../components/TableView/GeneralTableView'
import LogView from '../../components/ScrollView/LogView'
import request from '../../api/request'
export default {
name: 'TaskDetail',
@@ -46,12 +47,12 @@ export default {
data () {
return {
activeTabName: 'overview',
handle: undefined
handle: undefined,
taskLog: ''
}
},
computed: {
...mapState('task', [
'taskLog',
'taskResultsData',
'taskResultsTotalCount'
]),
@@ -97,18 +98,23 @@ export default {
downloadCSV () {
this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id)
this.$st.sendEv('任务详情-结果', '下载CSV')
},
getTaskLog () {
if (this.$route.params.id) {
request.get(`/tasks/${this.$route.params.id}/log`).then(response => {
this.taskLog = response.data.data
})
}
}
},
async created () {
await this.$store.dispatch('task/getTaskData', this.$route.params.id)
this.$store.dispatch('task/getTaskLog', this.$route.params.id)
created () {
this.$store.dispatch('task/getTaskData', this.$route.params.id)
this.$store.dispatch('task/getTaskResults', this.$route.params.id)
if (this.taskForm && ['running'].includes(this.taskForm.status)) {
this.handle = setInterval(() => {
this.$store.dispatch('task/getTaskLog', this.$route.params.id)
}, 5000)
}
this.getTaskLog()
this.handle = setInterval(() => {
this.getTaskLog()
}, 5000)
},
destroyed () {
clearInterval(this.handle)

View File

@@ -4,28 +4,28 @@
<!--filter-->
<div class="filter">
<div class="left">
<el-select size="small" class="filter-select"
v-model="filter.node_id"
:placeholder="$t('Node')"
filterable
clearable
@change="onSelectNode">
<el-option v-for="op in nodeList" :key="op._id" :value="op._id" :label="op.name"></el-option>
</el-select>
<el-select size="small" class="filter-select"
v-model="filter.spider_id"
:placeholder="$t('Spider')"
filterable
clearable
@change="onSelectSpider">
<el-option v-for="op in spiderList" :key="op._id" :value="op._id" :label="op.name"></el-option>
</el-select>
<el-button size="small" type="success"
icon="el-icon-search"
class="refresh"
@click="onRefresh">
{{$t('Search')}}
</el-button>
<!--<el-select size="small" class="filter-select"-->
<!--v-model="filter.node_id"-->
<!--:placeholder="$t('Node')"-->
<!--filterable-->
<!--clearable-->
<!--@change="onSelectNode">-->
<!--<el-option v-for="op in nodeList" :key="op._id" :value="op._id" :label="op.name"></el-option>-->
<!--</el-select>-->
<!--<el-select size="small" class="filter-select"-->
<!--v-model="filter.spider_id"-->
<!--:placeholder="$t('Spider')"-->
<!--filterable-->
<!--clearable-->
<!--@change="onSelectSpider">-->
<!--<el-option v-for="op in spiderList" :key="op._id" :value="op._id" :label="op.name"></el-option>-->
<!--</el-select>-->
<!--<el-button size="small" type="success"-->
<!--icon="el-icon-search"-->
<!--class="refresh"-->
<!--@click="onRefresh">-->
<!--{{$t('Search')}}-->
<!--</el-button>-->
</div>
<!--<div class="right">-->
<!--</div>-->