mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
@@ -14,7 +14,8 @@ ADD ./frontend /app
|
||||
WORKDIR /app
|
||||
|
||||
# install frontend
|
||||
RUN npm install -g yarn && yarn install
|
||||
RUN npm config set unsafe-perm true
|
||||
RUN npm install -g yarn && yarn install --registry=https://registry.npm.taobao.org
|
||||
|
||||
RUN npm run build:prod
|
||||
|
||||
@@ -56,4 +57,4 @@ EXPOSE 8080
|
||||
EXPOSE 8000
|
||||
|
||||
# start backend
|
||||
CMD ["/bin/sh", "/app/docker_init.sh"]
|
||||
CMD ["/bin/sh", "/app/docker_init.sh"]
|
||||
|
||||
@@ -2,6 +2,7 @@ package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crawlab/utils"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
@@ -26,7 +27,7 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
|
||||
tick := time.NewTicker(time.Second * 3)
|
||||
defer tick.Stop()
|
||||
go func() {
|
||||
defer func() { _ = psc.Close() }()
|
||||
defer utils.Close(psc)
|
||||
for {
|
||||
switch msg := psc.Receive().(type) {
|
||||
case error:
|
||||
@@ -87,7 +88,7 @@ func (r *Redis) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
|
||||
}
|
||||
func (r *Redis) Publish(channel, message string) (n int, err error) {
|
||||
conn := r.pool.Get()
|
||||
defer func() { _ = conn.Close() }()
|
||||
defer utils.Close(conn)
|
||||
n, err = redis.Int(conn.Do("PUBLISH", channel, message))
|
||||
if err != nil {
|
||||
return 0, errors2.Wrapf(err, "redis publish %s %s", channel, message)
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crawlab/entity"
|
||||
"crawlab/utils"
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/spf13/viper"
|
||||
"runtime/debug"
|
||||
@@ -18,7 +22,7 @@ func NewRedisClient() *Redis {
|
||||
}
|
||||
func (r *Redis) RPush(collection string, value interface{}) error {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
if _, err := c.Do("RPUSH", collection, value); err != nil {
|
||||
debug.PrintStack()
|
||||
@@ -29,7 +33,7 @@ func (r *Redis) RPush(collection string, value interface{}) error {
|
||||
|
||||
func (r *Redis) LPop(collection string) (string, error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
value, err2 := redis.String(c.Do("LPOP", collection))
|
||||
if err2 != nil {
|
||||
@@ -40,7 +44,7 @@ func (r *Redis) LPop(collection string) (string, error) {
|
||||
|
||||
func (r *Redis) HSet(collection string, key string, value string) error {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
if _, err := c.Do("HSET", collection, key, value); err != nil {
|
||||
debug.PrintStack()
|
||||
@@ -51,7 +55,7 @@ func (r *Redis) HSet(collection string, key string, value string) error {
|
||||
|
||||
func (r *Redis) HGet(collection string, key string) (string, error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
value, err2 := redis.String(c.Do("HGET", collection, key))
|
||||
if err2 != nil {
|
||||
@@ -62,7 +66,7 @@ func (r *Redis) HGet(collection string, key string) (string, error) {
|
||||
|
||||
func (r *Redis) HDel(collection string, key string) error {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
if _, err := c.Do("HDEL", collection, key); err != nil {
|
||||
return err
|
||||
@@ -72,7 +76,7 @@ func (r *Redis) HDel(collection string, key string) error {
|
||||
|
||||
func (r *Redis) HKeys(collection string) ([]string, error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
defer utils.Close(c)
|
||||
|
||||
value, err2 := redis.Strings(c.Do("HKeys", collection))
|
||||
if err2 != nil {
|
||||
@@ -120,3 +124,22 @@ func InitRedis() error {
|
||||
RedisClient = NewRedisClient()
|
||||
return nil
|
||||
}
|
||||
|
||||
func Pub(channel string, msg entity.NodeMessage) error {
|
||||
if _, err := RedisClient.Publish(channel, utils.GetJson(msg)); err != nil {
|
||||
log.Errorf("publish redis error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Sub(channel string, consume ConsumeFunc) error {
|
||||
ctx := context.Background()
|
||||
if err := RedisClient.Subscribe(ctx, consume, channel); err != nil {
|
||||
log.Errorf("subscribe redis error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ func (O OPError) Error() string {
|
||||
switch O.Scope {
|
||||
case ScopeSystem:
|
||||
scope = "system"
|
||||
break
|
||||
case ScopeBusiness:
|
||||
scope = "business"
|
||||
}
|
||||
|
||||
@@ -44,17 +44,14 @@ func TestFuncPanicRecovery(t *testing.T) {
|
||||
WithChain(Recover(newBufLogger(&buf))))
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
cron.AddFunc("* * * * * ?", func() {
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() {
|
||||
panic("YOLO")
|
||||
})
|
||||
|
||||
select {
|
||||
case <-time.After(OneSecond):
|
||||
if !strings.Contains(buf.String(), "YOLO") {
|
||||
t.Error("expected a panic to be logged, got none")
|
||||
}
|
||||
return
|
||||
<-time.After(OneSecond)
|
||||
if !strings.Contains(buf.String(), "YOLO") {
|
||||
t.Error("expected a panic to be logged, got none")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type DummyJob struct{}
|
||||
@@ -71,7 +68,7 @@ func TestJobPanicRecovery(t *testing.T) {
|
||||
WithChain(Recover(newBufLogger(&buf))))
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
cron.AddJob("* * * * * ?", job)
|
||||
_, _ = cron.AddJob("* * * * * ?", job)
|
||||
|
||||
select {
|
||||
case <-time.After(OneSecond):
|
||||
@@ -102,7 +99,7 @@ func TestStopCausesJobsToNotRun(t *testing.T) {
|
||||
cron := newWithSeconds()
|
||||
cron.Start()
|
||||
cron.Stop()
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
|
||||
select {
|
||||
case <-time.After(OneSecond):
|
||||
@@ -118,7 +115,7 @@ func TestAddBeforeRunning(t *testing.T) {
|
||||
wg.Add(1)
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
|
||||
@@ -138,7 +135,7 @@ func TestAddWhileRunning(t *testing.T) {
|
||||
cron := newWithSeconds()
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
|
||||
select {
|
||||
case <-time.After(OneSecond):
|
||||
@@ -154,7 +151,7 @@ func TestAddWhileRunningWithDelay(t *testing.T) {
|
||||
defer cron.Stop()
|
||||
time.Sleep(5 * time.Second)
|
||||
var calls int64
|
||||
cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
|
||||
_, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
|
||||
|
||||
<-time.After(OneSecond)
|
||||
if atomic.LoadInt64(&calls) != 1 {
|
||||
@@ -205,7 +202,7 @@ func TestSnapshotEntries(t *testing.T) {
|
||||
wg.Add(1)
|
||||
|
||||
cron := New()
|
||||
cron.AddFunc("@every 2s", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("@every 2s", func() { wg.Done() })
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
|
||||
@@ -232,12 +229,12 @@ func TestMultipleEntries(t *testing.T) {
|
||||
wg.Add(2)
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("0 0 0 1 1 ?", func() {})
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
id1, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
|
||||
id2, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
|
||||
cron.AddFunc("0 0 0 31 12 ?", func() {})
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
|
||||
cron.Remove(id1)
|
||||
cron.Start()
|
||||
@@ -257,9 +254,9 @@ func TestRunningJobTwice(t *testing.T) {
|
||||
wg.Add(2)
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("0 0 0 1 1 ?", func() {})
|
||||
cron.AddFunc("0 0 0 31 12 ?", func() {})
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
|
||||
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
@@ -276,9 +273,9 @@ func TestRunningMultipleSchedules(t *testing.T) {
|
||||
wg.Add(2)
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("0 0 0 1 1 ?", func() {})
|
||||
cron.AddFunc("0 0 0 31 12 ?", func() {})
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("0 0 0 1 1 ?", func() {})
|
||||
_, _ = cron.AddFunc("0 0 0 31 12 ?", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
cron.Schedule(Every(time.Minute), FuncJob(func() {}))
|
||||
cron.Schedule(Every(time.Second), FuncJob(func() { wg.Done() }))
|
||||
cron.Schedule(Every(time.Hour), FuncJob(func() {}))
|
||||
@@ -310,7 +307,7 @@ func TestLocalTimezone(t *testing.T) {
|
||||
now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month())
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc(spec, func() { wg.Done() })
|
||||
_, _ = cron.AddFunc(spec, func() { wg.Done() })
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
|
||||
@@ -344,7 +341,7 @@ func TestNonLocalTimezone(t *testing.T) {
|
||||
now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month())
|
||||
|
||||
cron := New(WithLocation(loc), WithParser(secondParser))
|
||||
cron.AddFunc(spec, func() { wg.Done() })
|
||||
_, _ = cron.AddFunc(spec, func() { wg.Done() })
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
|
||||
@@ -386,7 +383,7 @@ func TestBlockingRun(t *testing.T) {
|
||||
wg.Add(1)
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() { wg.Done() })
|
||||
|
||||
var unblockChan = make(chan struct{})
|
||||
|
||||
@@ -410,7 +407,7 @@ func TestStartNoop(t *testing.T) {
|
||||
var tickChan = make(chan struct{}, 2)
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("* * * * * ?", func() {
|
||||
_, _ = cron.AddFunc("* * * * * ?", func() {
|
||||
tickChan <- struct{}{}
|
||||
})
|
||||
|
||||
@@ -438,10 +435,10 @@ func TestJob(t *testing.T) {
|
||||
wg.Add(1)
|
||||
|
||||
cron := newWithSeconds()
|
||||
cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"})
|
||||
cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"})
|
||||
_, _ = cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"})
|
||||
_, _ = cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"})
|
||||
job2, _ := cron.AddJob("* * * * * ?", testJob{wg, "job2"})
|
||||
cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"})
|
||||
_, _ = cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"})
|
||||
cron.Schedule(Every(5*time.Second+5*time.Nanosecond), testJob{wg, "job4"})
|
||||
job5 := cron.Schedule(Every(5*time.Minute), testJob{wg, "job5"})
|
||||
|
||||
@@ -465,7 +462,7 @@ func TestJob(t *testing.T) {
|
||||
// Ensure the entries are in the right order.
|
||||
expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"}
|
||||
|
||||
var actuals []string
|
||||
var actuals = make([]string, 0, len(cron.Entries()))
|
||||
for _, entry := range cron.Entries() {
|
||||
actuals = append(actuals, entry.Job.(testJob).name)
|
||||
}
|
||||
@@ -545,7 +542,7 @@ func (*ZeroSchedule) Next(time.Time) time.Time {
|
||||
func TestJobWithZeroTimeDoesNotRun(t *testing.T) {
|
||||
cron := newWithSeconds()
|
||||
var calls int64
|
||||
cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
|
||||
_, _ = cron.AddFunc("* * * * * *", func() { atomic.AddInt64(&calls, 1) })
|
||||
cron.Schedule(new(ZeroSchedule), FuncJob(func() { t.Error("expected zero task will not run") }))
|
||||
cron.Start()
|
||||
defer cron.Stop()
|
||||
@@ -582,11 +579,11 @@ func TestStopAndWait(t *testing.T) {
|
||||
|
||||
t.Run("a couple fast jobs added, still returns immediately", func(t *testing.T) {
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
cron.Start()
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
time.Sleep(time.Second)
|
||||
ctx := cron.Stop()
|
||||
select {
|
||||
@@ -598,10 +595,10 @@ func TestStopAndWait(t *testing.T) {
|
||||
|
||||
t.Run("a couple fast jobs and a slow job added, waits for slow job", func(t *testing.T) {
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
cron.Start()
|
||||
cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
time.Sleep(time.Second)
|
||||
|
||||
ctx := cron.Stop()
|
||||
@@ -625,10 +622,10 @@ func TestStopAndWait(t *testing.T) {
|
||||
|
||||
t.Run("repeated calls to stop, waiting for completion and after", func(t *testing.T) {
|
||||
cron := newWithSeconds()
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
|
||||
cron.Start()
|
||||
cron.AddFunc("* * * * * *", func() {})
|
||||
_, _ = cron.AddFunc("* * * * * *", func() {})
|
||||
time.Sleep(time.Second)
|
||||
ctx := cron.Stop()
|
||||
ctx2 := cron.Stop()
|
||||
|
||||
@@ -9,10 +9,10 @@ import (
|
||||
)
|
||||
|
||||
// DefaultLogger is used by Cron if none is specified.
|
||||
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
|
||||
var DefaultLogger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
|
||||
|
||||
// DiscardLogger can be used by callers to discard all log messages.
|
||||
var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0))
|
||||
var DiscardLogger = PrintfLogger(log.New(ioutil.Discard, "", 0))
|
||||
|
||||
// Logger is the interface used in this package for logging, so that any backend
|
||||
// can be plugged in. It is a subset of the github.com/go-logr/logr interface.
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestWithVerboseLogger(t *testing.T) {
|
||||
t.Error("expected provided logger")
|
||||
}
|
||||
|
||||
c.AddFunc("@every 1s", func() {})
|
||||
_, _ = c.AddFunc("@every 1s", func() {})
|
||||
c.Start()
|
||||
time.Sleep(OneSecond)
|
||||
c.Stop()
|
||||
|
||||
@@ -304,6 +304,7 @@ func TestNormalizeFields_Errors(t *testing.T) {
|
||||
actual, err := normalizeFields(test.input, test.options)
|
||||
if err == nil {
|
||||
t.Errorf("expected an error, got none. results: %v", actual)
|
||||
return
|
||||
}
|
||||
if !strings.Contains(err.Error(), test.err) {
|
||||
t.Errorf("expected error %q, got %q", test.err, err.Error())
|
||||
|
||||
@@ -178,8 +178,8 @@ WRAP:
|
||||
// restrictions are satisfied by the given time.
|
||||
func dayMatches(s *SpecSchedule, t time.Time) bool {
|
||||
var (
|
||||
domMatch bool = 1<<uint(t.Day())&s.Dom > 0
|
||||
dowMatch bool = 1<<uint(t.Weekday())&s.Dow > 0
|
||||
domMatch = 1<<uint(t.Day())&s.Dom > 0
|
||||
dowMatch = 1<<uint(t.Weekday())&s.Dow > 0
|
||||
)
|
||||
if s.Dom&starBit > 0 || s.Dow&starBit > 0 {
|
||||
return domMatch && dowMatch
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crawlab/config"
|
||||
"crawlab/database"
|
||||
"crawlab/lib/validate_bridge"
|
||||
@@ -12,7 +13,13 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gin-gonic/gin/binding"
|
||||
"github.com/spf13/viper"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime/debug"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -166,8 +173,26 @@ func main() {
|
||||
// 运行服务器
|
||||
host := viper.GetString("server.host")
|
||||
port := viper.GetString("server.port")
|
||||
if err := app.Run(host + ":" + port); err != nil {
|
||||
address := net.JoinHostPort(host, port)
|
||||
srv := &http.Server{
|
||||
Handler: app,
|
||||
Addr: address,
|
||||
}
|
||||
go func() {
|
||||
if err := srv.ListenAndServe(); err != nil {
|
||||
if err != http.ErrServerClosed {
|
||||
log.Error("run server error:" + err.Error())
|
||||
} else {
|
||||
log.Info("server graceful down")
|
||||
}
|
||||
}
|
||||
}()
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-quit
|
||||
ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
if err := srv.Shutdown(ctx2); err != nil {
|
||||
log.Error("run server error:" + err.Error())
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ func DeleteNode(c *gin.Context) {
|
||||
id := bson.ObjectId("5d429e6c19f7abede924fee2")
|
||||
|
||||
for _, node := range NodeList {
|
||||
if node.Id == bson.ObjectId(id) {
|
||||
if node.Id == id {
|
||||
log.Infof("Delete a node")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"crawlab/utils"
|
||||
"github.com/apex/log"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
@@ -21,9 +22,9 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) {
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
defer utils.Close(f)
|
||||
|
||||
const bufLen = 2 * 1024 * 1024
|
||||
const bufLen = 1 * 1024 * 1024
|
||||
logBuf := make([]byte, bufLen)
|
||||
|
||||
off := int64(0)
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/services/register"
|
||||
"errors"
|
||||
"github.com/apex/log"
|
||||
"github.com/globalsign/mgo"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
@@ -33,7 +34,6 @@ type Node struct {
|
||||
|
||||
const (
|
||||
Yes = "Y"
|
||||
No = "N"
|
||||
)
|
||||
|
||||
// 当前节点是否为主节点
|
||||
@@ -157,15 +157,20 @@ func GetNodeList(filter interface{}) ([]Node, error) {
|
||||
}
|
||||
|
||||
func GetNode(id bson.ObjectId) (Node, error) {
|
||||
var node Node
|
||||
|
||||
if id.Hex() == "" {
|
||||
log.Infof("id is empty")
|
||||
debug.PrintStack()
|
||||
return node, errors.New("id is empty")
|
||||
}
|
||||
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
|
||||
var node Node
|
||||
if err := c.FindId(id).One(&node); err != nil {
|
||||
if err != mgo.ErrNotFound {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
}
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return node, err
|
||||
}
|
||||
return node, nil
|
||||
|
||||
@@ -16,6 +16,7 @@ type Schedule struct {
|
||||
Description string `json:"description" bson:"description"`
|
||||
SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"`
|
||||
NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
|
||||
NodeKey string `json:"node_key" bson:"node_key"`
|
||||
Cron string `json:"cron" bson:"cron"`
|
||||
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
|
||||
Param string `json:"param" bson:"param"`
|
||||
@@ -38,6 +39,33 @@ func (sch *Schedule) Save() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sch *Schedule) Delete() error {
|
||||
s, c := database.GetCol("schedules")
|
||||
defer s.Close()
|
||||
return c.RemoveId(sch.Id)
|
||||
}
|
||||
|
||||
func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) {
|
||||
sch.syncNodeId(node)
|
||||
sch.syncSpiderId(spider)
|
||||
}
|
||||
|
||||
func (sch *Schedule) syncNodeId(node Node) {
|
||||
if node.Id.Hex() == sch.NodeId.Hex() {
|
||||
return
|
||||
}
|
||||
sch.NodeId = node.Id
|
||||
_ = sch.Save()
|
||||
}
|
||||
|
||||
func (sch *Schedule) syncSpiderId(spider Spider) {
|
||||
if spider.Id.Hex() == sch.SpiderId.Hex() {
|
||||
return
|
||||
}
|
||||
sch.SpiderId = spider.Id
|
||||
_ = sch.Save()
|
||||
}
|
||||
|
||||
func GetScheduleList(filter interface{}) ([]Schedule, error) {
|
||||
s, c := database.GetCol("schedules")
|
||||
defer s.Close()
|
||||
@@ -47,11 +75,12 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
|
||||
return schedules, err
|
||||
}
|
||||
|
||||
for i, schedule := range schedules {
|
||||
var schs []Schedule
|
||||
for _, schedule := range schedules {
|
||||
// 获取节点名称
|
||||
if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) {
|
||||
// 选择所有节点
|
||||
schedules[i].NodeName = "All Nodes"
|
||||
schedule.NodeName = "All Nodes"
|
||||
} else {
|
||||
// 选择单一节点
|
||||
node, err := GetNode(schedule.NodeId)
|
||||
@@ -59,7 +88,7 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
|
||||
log.Errorf(err.Error())
|
||||
continue
|
||||
}
|
||||
schedules[i].NodeName = node.Name
|
||||
schedule.NodeName = node.Name
|
||||
}
|
||||
|
||||
// 获取爬虫名称
|
||||
@@ -67,11 +96,13 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
|
||||
if err != nil {
|
||||
log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error())
|
||||
debug.PrintStack()
|
||||
_ = schedule.Delete()
|
||||
continue
|
||||
}
|
||||
schedules[i].SpiderName = spider.Name
|
||||
schedule.SpiderName = spider.Name
|
||||
schs = append(schs, schedule)
|
||||
}
|
||||
return schedules, nil
|
||||
return schs, nil
|
||||
}
|
||||
|
||||
func GetSchedule(id bson.ObjectId) (Schedule, error) {
|
||||
@@ -93,7 +124,12 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error {
|
||||
if err := c.FindId(id).One(&result); err != nil {
|
||||
return err
|
||||
}
|
||||
node, err := GetNode(item.NodeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
item.NodeKey = node.Key
|
||||
if err := item.Save(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -104,9 +140,15 @@ func AddSchedule(item Schedule) error {
|
||||
s, c := database.GetCol("schedules")
|
||||
defer s.Close()
|
||||
|
||||
node, err := GetNode(item.NodeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
item.Id = bson.NewObjectId()
|
||||
item.CreateTs = time.Now()
|
||||
item.UpdateTs = time.Now()
|
||||
item.NodeKey = node.Key
|
||||
|
||||
if err := c.Insert(&item); err != nil {
|
||||
debug.PrintStack()
|
||||
|
||||
@@ -98,13 +98,19 @@ func (spider *Spider) GetLastTask() (Task, error) {
|
||||
return tasks[0], nil
|
||||
}
|
||||
|
||||
func (spider *Spider) Delete() error {
|
||||
s, c := database.GetCol("spiders")
|
||||
defer s.Close()
|
||||
return c.RemoveId(spider.Id)
|
||||
}
|
||||
|
||||
// 爬虫列表
|
||||
func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, int, error) {
|
||||
s, c := database.GetCol("spiders")
|
||||
defer s.Close()
|
||||
|
||||
// 获取爬虫列表
|
||||
spiders := []Spider{}
|
||||
var spiders []Spider
|
||||
if err := c.Find(filter).Skip(skip).Limit(limit).Sort("+name").All(&spiders); err != nil {
|
||||
debug.PrintStack()
|
||||
return spiders, 0, err
|
||||
@@ -225,7 +231,7 @@ func RemoveAllSpider() error {
|
||||
s, c := database.GetCol("spiders")
|
||||
defer s.Close()
|
||||
|
||||
spiders := []Spider{}
|
||||
var spiders []Spider
|
||||
err := c.Find(nil).All(&spiders)
|
||||
if err != nil {
|
||||
log.Error("get all spiders error:" + err.Error())
|
||||
@@ -256,15 +262,14 @@ func GetSpiderTypes() ([]*entity.SpiderType, error) {
|
||||
s, c := database.GetCol("spiders")
|
||||
defer s.Close()
|
||||
|
||||
|
||||
group := bson.M{
|
||||
"$group": bson.M{
|
||||
"_id": "$type",
|
||||
"_id": "$type",
|
||||
"count": bson.M{"$sum": 1},
|
||||
},
|
||||
}
|
||||
var types []*entity.SpiderType
|
||||
if err := c.Pipe([]bson.M{ group}).All(&types); err != nil {
|
||||
if err := c.Pipe([]bson.M{group}).All(&types); err != nil {
|
||||
log.Errorf("get spider types error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"github.com/apex/log"
|
||||
"github.com/globalsign/mgo"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
@@ -118,20 +117,16 @@ func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Tas
|
||||
for i, task := range tasks {
|
||||
// 获取爬虫名称
|
||||
spider, err := task.GetSpider()
|
||||
if err == mgo.ErrNotFound {
|
||||
// do nothing
|
||||
} else if err != nil {
|
||||
return tasks, err
|
||||
if spider.Id.Hex() == "" || err != nil {
|
||||
_ = spider.Delete()
|
||||
} else {
|
||||
tasks[i].SpiderName = spider.DisplayName
|
||||
}
|
||||
|
||||
// 获取节点名称
|
||||
node, err := task.GetNode()
|
||||
if err == mgo.ErrNotFound {
|
||||
// do nothing
|
||||
} else if err != nil {
|
||||
return tasks, err
|
||||
if node.Id.Hex() == "" || err != nil {
|
||||
_ = task.Delete()
|
||||
} else {
|
||||
tasks[i].NodeName = node.Name
|
||||
}
|
||||
|
||||
@@ -15,9 +15,9 @@ func GetNodeList(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
for i, node := range nodes {
|
||||
nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex())
|
||||
}
|
||||
//for i, node := range nodes {
|
||||
// nodes[i].IsMaster = services.IsMasterNode(node.Id.Hex())
|
||||
//}
|
||||
|
||||
c.JSON(http.StatusOK, Response{
|
||||
Status: "ok",
|
||||
@@ -109,11 +109,11 @@ func GetSystemInfo(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
func DeleteNode(c *gin.Context) {
|
||||
func DeleteNode(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
node, err := model.GetNode(bson.ObjectIdHex(id))
|
||||
if err != nil {
|
||||
HandleError(http.StatusInternalServerError, c ,err)
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
err = node.Delete()
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package routes
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/model"
|
||||
"crawlab/services"
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -46,13 +45,14 @@ func PostSchedule(c *gin.Context) {
|
||||
HandleError(http.StatusBadRequest, c, err)
|
||||
return
|
||||
}
|
||||
newItem.Id = bson.ObjectIdHex(id)
|
||||
|
||||
// 如果node_id为空,则置为空ObjectId
|
||||
if newItem.NodeId == "" {
|
||||
newItem.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
|
||||
// 验证cron表达式
|
||||
if err := services.ParserCron(newItem.Cron); err != nil {
|
||||
HandleError(http.StatusOK, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
newItem.Id = bson.ObjectIdHex(id)
|
||||
// 更新数据库
|
||||
if err := model.UpdateSchedule(bson.ObjectIdHex(id), newItem); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
@@ -80,9 +80,10 @@ func PutSchedule(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 如果node_id为空,则置为空ObjectId
|
||||
if item.NodeId == "" {
|
||||
item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
|
||||
// 验证cron表达式
|
||||
if err := services.ParserCron(item.Cron); err != nil {
|
||||
HandleError(http.StatusOK, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 更新数据库
|
||||
|
||||
@@ -66,7 +66,7 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
|
||||
"message": "error",
|
||||
"error": errStr,
|
||||
})
|
||||
break
|
||||
|
||||
case validator.ValidationErrors:
|
||||
validatorErrors := causeError.(validator.ValidationErrors)
|
||||
//firstError := validatorErrors[0].(validator.FieldError)
|
||||
@@ -75,7 +75,6 @@ func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
|
||||
"message": "error",
|
||||
"error": validatorErrors.Error(),
|
||||
})
|
||||
break
|
||||
default:
|
||||
fmt.Println("deprecated....")
|
||||
c.AbortWithStatusJSON(httpCode, gin.H{
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 任务日志频道映射
|
||||
@@ -45,8 +46,14 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
|
||||
// 生成频道,等待获取log
|
||||
ch := TaskLogChanMap.ChanBlocked(task.Id)
|
||||
|
||||
// 此处阻塞,等待结果
|
||||
logStr = <-ch
|
||||
select {
|
||||
case logStr = <-ch:
|
||||
log.Infof("get remote log")
|
||||
break
|
||||
case <-time.After(30 * time.Second):
|
||||
logStr = "get remote log timeout"
|
||||
break
|
||||
}
|
||||
|
||||
return logStr, nil
|
||||
}
|
||||
@@ -67,7 +74,7 @@ func DeleteLogPeriodically() {
|
||||
for _, fi := range rd {
|
||||
if fi.IsDir() {
|
||||
log.Info(filepath.Join(logDir, fi.Name()))
|
||||
os.RemoveAll(filepath.Join(logDir, fi.Name()))
|
||||
_ = os.RemoveAll(filepath.Join(logDir, fi.Name()))
|
||||
log.Info("Delete Log File Success")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,10 +13,8 @@ import (
|
||||
|
||||
func TestDeleteLogPeriodically(t *testing.T) {
|
||||
Convey("Test DeleteLogPeriodically", t, func() {
|
||||
if err := config.InitConfig("../conf/config.yml"); err != nil {
|
||||
log.Error("init config error:" + err.Error())
|
||||
panic(err)
|
||||
}
|
||||
err := config.InitConfig("../conf/config.yml")
|
||||
So(err, ShouldBeNil)
|
||||
log.Info("初始化配置成功")
|
||||
logDir := viper.GetString("log.path")
|
||||
log.Info(logDir)
|
||||
@@ -28,24 +26,16 @@ func TestGetLocalLog(t *testing.T) {
|
||||
//create a log file for test
|
||||
logPath := "../logs/crawlab/test.log"
|
||||
f, err := os.Create(logPath)
|
||||
defer f.Close()
|
||||
defer utils.Close(f)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
fmt.Println(err)
|
||||
|
||||
} else {
|
||||
_, err = f.WriteString("This is for test")
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
Convey("Test GetLocalLog", t, func() {
|
||||
Convey("Test response", func() {
|
||||
logStr, err := GetLocalLog(logPath)
|
||||
log.Info(utils.BytesToString(logStr))
|
||||
fmt.Println(err)
|
||||
So(err, ShouldEqual, nil)
|
||||
|
||||
})
|
||||
})
|
||||
//delete the test log file
|
||||
os.Remove(logPath)
|
||||
_ = os.Remove(logPath)
|
||||
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package msg_handler
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/entity"
|
||||
"github.com/apex/log"
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
@@ -10,6 +11,7 @@ type Handler interface {
|
||||
}
|
||||
|
||||
func GetMsgHandler(msg entity.NodeMessage) Handler {
|
||||
log.Infof("received msg , type is : %s", msg.Type)
|
||||
if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
|
||||
// 日志相关
|
||||
return &Log{
|
||||
|
||||
@@ -2,6 +2,7 @@ package msg_handler
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/entity"
|
||||
"crawlab/model"
|
||||
"crawlab/utils"
|
||||
@@ -40,8 +41,11 @@ func (g *Log) get() error {
|
||||
}
|
||||
// 发布消息给主节点
|
||||
if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil {
|
||||
log.Errorf("pub log to master node error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
log.Infof(msgSd.Log)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,9 +2,9 @@ package msg_handler
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/entity"
|
||||
"crawlab/model"
|
||||
"crawlab/utils"
|
||||
)
|
||||
|
||||
type SystemInfo struct {
|
||||
@@ -22,7 +22,7 @@ func (s *SystemInfo) Handle() error {
|
||||
NodeId: s.msg.NodeId,
|
||||
SysInfo: sysInfo,
|
||||
}
|
||||
if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil {
|
||||
if err := database.Pub(constants.ChannelMasterNode, msgSd); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -88,6 +88,8 @@ func UpdateNodeStatus() {
|
||||
handleNodeInfo(key, data)
|
||||
}
|
||||
|
||||
// 重新获取list
|
||||
list, _ = database.RedisClient.HKeys("nodes")
|
||||
// 重置不在redis的key为offline
|
||||
model.ResetNodeStatusToOffline(list)
|
||||
}
|
||||
@@ -100,7 +102,7 @@ func handleNodeInfo(key string, data Data) {
|
||||
// 同个key可能因为并发,被注册多次
|
||||
var nodes []model.Node
|
||||
_ = c.Find(bson.M{"key": key}).All(&nodes)
|
||||
if nodes != nil && len(nodes) > 1 {
|
||||
if len(nodes) > 1 {
|
||||
for _, node := range nodes {
|
||||
_ = c.RemoveId(node.Id)
|
||||
}
|
||||
@@ -110,13 +112,15 @@ func handleNodeInfo(key string, data Data) {
|
||||
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
|
||||
// 数据库不存在该节点
|
||||
node = model.Node{
|
||||
Key: key,
|
||||
Name: data.Ip,
|
||||
Ip: data.Ip,
|
||||
Port: "8000",
|
||||
Mac: data.Mac,
|
||||
Status: constants.StatusOnline,
|
||||
IsMaster: data.Master,
|
||||
Key: key,
|
||||
Name: data.Ip,
|
||||
Ip: data.Ip,
|
||||
Port: "8000",
|
||||
Mac: data.Mac,
|
||||
Status: constants.StatusOnline,
|
||||
IsMaster: data.Master,
|
||||
UpdateTs: time.Now(),
|
||||
UpdateTsUnix: time.Now().Unix(),
|
||||
}
|
||||
if err := node.Add(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
@@ -125,6 +129,8 @@ func handleNodeInfo(key string, data Data) {
|
||||
} else {
|
||||
// 数据库存在该节点
|
||||
node.Status = constants.StatusOnline
|
||||
node.UpdateTs = time.Now()
|
||||
node.UpdateTsUnix = time.Now().Unix()
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
@@ -149,7 +155,11 @@ func UpdateNodeData() {
|
||||
}
|
||||
// 获取redis的key
|
||||
key, err := register.GetRegister().GetKey()
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
// 构造节点数据
|
||||
data := Data{
|
||||
Key: key,
|
||||
@@ -201,6 +211,8 @@ func WorkerNodeCallback(message redis.Message) (err error) {
|
||||
// 反序列化
|
||||
msg := utils.GetMessage(message)
|
||||
if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil {
|
||||
log.Errorf("msg handler error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -230,19 +242,19 @@ func InitNodeService() error {
|
||||
|
||||
if model.IsMaster() {
|
||||
// 如果为主节点,订阅主节点通信频道
|
||||
if err := utils.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
|
||||
if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// 若为工作节点,订阅单独指定通信频道
|
||||
channel := constants.ChannelWorkerNode + node.Id.Hex()
|
||||
if err := utils.Sub(channel, WorkerNodeCallback); err != nil {
|
||||
if err := database.Sub(channel, WorkerNodeCallback); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 订阅全通道
|
||||
if err := utils.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil {
|
||||
if err := database.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"crawlab/lib/cron"
|
||||
"crawlab/model"
|
||||
"github.com/apex/log"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/satori/go.uuid"
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
@@ -17,7 +17,22 @@ type Scheduler struct {
|
||||
|
||||
func AddTask(s model.Schedule) func() {
|
||||
return func() {
|
||||
nodeId := s.NodeId
|
||||
node, err := model.GetNodeByKey(s.NodeKey)
|
||||
if err != nil || node.Id.Hex() == "" {
|
||||
log.Errorf("get node by key error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
spider := model.GetSpiderByName(s.SpiderName)
|
||||
if spider == nil || spider.Id.Hex() == "" {
|
||||
log.Errorf("get spider by name error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
// 同步ID到定时任务
|
||||
s.SyncNodeIdAndSpiderId(node, *spider)
|
||||
|
||||
// 生成任务ID
|
||||
id := uuid.NewV4()
|
||||
@@ -25,8 +40,8 @@ func AddTask(s model.Schedule) func() {
|
||||
// 生成任务模型
|
||||
t := model.Task{
|
||||
Id: id.String(),
|
||||
SpiderId: s.SpiderId,
|
||||
NodeId: nodeId,
|
||||
SpiderId: spider.Id,
|
||||
NodeId: node.Id,
|
||||
Status: constants.StatusPending,
|
||||
Param: s.Param,
|
||||
}
|
||||
@@ -107,6 +122,18 @@ func (s *Scheduler) RemoveAll() {
|
||||
}
|
||||
}
|
||||
|
||||
// 验证cron表达式是否正确
|
||||
func ParserCron(spec string) error {
|
||||
parser := cron.NewParser(
|
||||
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
|
||||
)
|
||||
|
||||
if _, err := parser.Parse(spec); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Scheduler) Update() error {
|
||||
// 删除所有定时任务
|
||||
s.RemoveAll()
|
||||
|
||||
@@ -82,7 +82,7 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre
|
||||
log.Infof("can't opened this file")
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
defer utils.Close(f)
|
||||
s := make([]byte, 4096)
|
||||
for {
|
||||
switch nr, err := f.Read(s[:]); true {
|
||||
@@ -173,7 +173,7 @@ func RemoveSpider(id string) error {
|
||||
Type: constants.MsgTypeRemoveSpider,
|
||||
SpiderId: id,
|
||||
}
|
||||
if err := utils.Pub(constants.ChannelAllNode, msg); err != nil {
|
||||
if err := database.Pub(constants.ChannelAllNode, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ func (s *SpiderSync) CreateMd5File(md5 string) {
|
||||
|
||||
fileName := filepath.Join(path, Md5File)
|
||||
file := utils.OpenFile(fileName)
|
||||
defer file.Close()
|
||||
defer utils.Close(file)
|
||||
if file != nil {
|
||||
if _, err := file.WriteString(md5 + "\n"); err != nil {
|
||||
log.Errorf("file write string error: %s", err.Error())
|
||||
@@ -80,7 +80,7 @@ func (s *SpiderSync) Download() {
|
||||
defer session.Close()
|
||||
|
||||
f, err := gf.OpenId(bson.ObjectIdHex(fileId))
|
||||
defer f.Close()
|
||||
defer utils.Close(f)
|
||||
if err != nil {
|
||||
log.Errorf("open file id: " + fileId + ", spider id:" + spiderId + ", error: " + err.Error())
|
||||
debug.PrintStack()
|
||||
@@ -99,7 +99,7 @@ func (s *SpiderSync) Download() {
|
||||
// 创建临时文件
|
||||
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
|
||||
tmpFile := utils.OpenFile(tmpFilePath)
|
||||
defer tmpFile.Close()
|
||||
defer utils.Close(tmpFile)
|
||||
|
||||
// 将该文件写入临时文件
|
||||
if _, err := io.Copy(tmpFile, f); err != nil {
|
||||
|
||||
@@ -100,91 +100,85 @@ func AssignTask(task model.Task) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 执行shell命令
|
||||
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) {
|
||||
log.Infof("cwd: " + cwd)
|
||||
log.Infof("cmd: " + cmdStr)
|
||||
// 设置环境变量
|
||||
func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd {
|
||||
// 默认环境变量
|
||||
cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+taskId)
|
||||
cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol)
|
||||
cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0")
|
||||
cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8")
|
||||
cmd.Env = append(cmd.Env, "TZ=Asia/Shanghai")
|
||||
|
||||
// 生成执行命令
|
||||
var cmd *exec.Cmd
|
||||
if runtime.GOOS == constants.Windows {
|
||||
cmd = exec.Command("cmd", "/C", cmdStr)
|
||||
} else {
|
||||
cmd = exec.Command("sh", "-c", cmdStr)
|
||||
}
|
||||
|
||||
// 工作目录
|
||||
cmd.Dir = cwd
|
||||
|
||||
// 指定stdout, stderr日志位置
|
||||
fLog, err := os.Create(t.LogPath)
|
||||
if err != nil {
|
||||
HandleTaskError(t, err)
|
||||
return err
|
||||
}
|
||||
defer fLog.Close()
|
||||
cmd.Stdout = fLog
|
||||
cmd.Stderr = fLog
|
||||
|
||||
// 添加默认环境变量
|
||||
cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id)
|
||||
cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col)
|
||||
|
||||
// 添加任务环境变量
|
||||
for _, env := range s.Envs {
|
||||
//任务环境变量
|
||||
for _, env := range envs {
|
||||
cmd.Env = append(cmd.Env, env.Name+"="+env.Value)
|
||||
}
|
||||
|
||||
// 起一个goroutine来监控进程
|
||||
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
|
||||
go func() {
|
||||
// 传入信号,此处阻塞
|
||||
signal := <-ch
|
||||
log.Infof("cancel process signal: %s", signal)
|
||||
if signal == constants.TaskCancel && cmd.Process != nil {
|
||||
// 取消进程
|
||||
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
|
||||
log.Errorf("process kill error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
}
|
||||
t.Status = constants.StatusCancelled
|
||||
} else {
|
||||
// 保存任务
|
||||
t.Status = constants.StatusFinished
|
||||
}
|
||||
t.FinishTs = time.Now()
|
||||
t.Error = "user kill the process ..."
|
||||
if err := t.Save(); err != nil {
|
||||
log.Infof("save task error: %s", err.Error())
|
||||
// TODO 全局环境变量
|
||||
return cmd
|
||||
}
|
||||
|
||||
func SetLogConfig(cmd *exec.Cmd, path string) error {
|
||||
fLog, err := os.Create(path)
|
||||
if err != nil {
|
||||
log.Errorf("create task log file error: %s", path)
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
cmd.Stdout = fLog
|
||||
cmd.Stderr = fLog
|
||||
return nil
|
||||
}
|
||||
|
||||
func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) {
|
||||
// 传入信号,此处阻塞
|
||||
signal := <-ch
|
||||
log.Infof("process received signal: %s", signal)
|
||||
|
||||
if signal == constants.TaskCancel && cmd.Process != nil {
|
||||
// 取消进程
|
||||
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
|
||||
log.Errorf("process kill error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
|
||||
t.Error = "kill process error: " + err.Error()
|
||||
t.Status = constants.StatusError
|
||||
} else {
|
||||
t.Error = "user kill the process ..."
|
||||
t.Status = constants.StatusCancelled
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
// 保存任务
|
||||
t.Status = constants.StatusFinished
|
||||
}
|
||||
|
||||
// 在选择所有节点执行的时候,实际就是随机一个节点执行的,
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
t.FinishTs = time.Now()
|
||||
_ = t.Save()
|
||||
}
|
||||
|
||||
// 异步启动进程
|
||||
func StartTaskProcess(cmd *exec.Cmd, t model.Task) error {
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Errorf("start spider error:{}", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
|
||||
// 保存pid到task
|
||||
t.Pid = cmd.Process.Pid
|
||||
if err := t.Save(); err != nil {
|
||||
log.Errorf("save task pid error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
t.Error = "start task error: " + err.Error()
|
||||
t.Status = constants.StatusError
|
||||
t.FinishTs = time.Now()
|
||||
_ = t.Save()
|
||||
return err
|
||||
}
|
||||
// 同步等待进程完成
|
||||
return nil
|
||||
}
|
||||
|
||||
func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error {
|
||||
if err := cmd.Wait(); err != nil {
|
||||
log.Errorf("wait process finish error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
|
||||
if exitError, ok := err.(*exec.ExitError); ok {
|
||||
exitCode := exitError.ExitCode()
|
||||
log.Errorf("exit error, exit code: %d", exitCode)
|
||||
|
||||
// 非kill 的错误类型
|
||||
if exitCode != -1 {
|
||||
// 非手动kill保存为错误状态
|
||||
@@ -194,6 +188,52 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
|
||||
_ = t.Save()
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 执行shell命令
|
||||
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) {
|
||||
log.Infof("cwd: %s", cwd)
|
||||
log.Infof("cmd: %s", cmdStr)
|
||||
|
||||
// 生成执行命令
|
||||
var cmd *exec.Cmd
|
||||
if runtime.GOOS == constants.Windows {
|
||||
cmd = exec.Command("cmd", "/C", cmdStr)
|
||||
} else {
|
||||
cmd = exec.Command("")
|
||||
cmd = exec.Command("sh", "-c", cmdStr)
|
||||
}
|
||||
|
||||
// 工作目录
|
||||
cmd.Dir = cwd
|
||||
|
||||
// 日志配置
|
||||
if err := SetLogConfig(cmd, t.LogPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 环境变量配置
|
||||
cmd = SetEnv(cmd, s.Envs, t.Id, s.Col)
|
||||
|
||||
// 起一个goroutine来监控进程
|
||||
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
|
||||
|
||||
go FinishOrCancelTask(ch, cmd, t)
|
||||
|
||||
// kill的时候,可以kill所有的子进程
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
|
||||
// 启动进程
|
||||
if err := StartTaskProcess(cmd, t); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 同步等待进程完成
|
||||
if err := WaitTaskProcess(cmd, t); err != nil {
|
||||
return err
|
||||
}
|
||||
ch <- constants.TaskFinish
|
||||
@@ -208,6 +248,7 @@ func MakeLogDir(t model.Task) (fileDir string, err error) {
|
||||
// 如果日志目录不存在,生成该目录
|
||||
if !utils.Exists(fileDir) {
|
||||
if err := os.MkdirAll(fileDir, 0777); err != nil {
|
||||
log.Errorf("execute task, make log dir error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return "", err
|
||||
}
|
||||
@@ -272,82 +313,55 @@ func ExecuteTask(id int) {
|
||||
// 获取当前节点
|
||||
node, err := model.GetCurrentNode()
|
||||
if err != nil {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
log.Errorf("execute task get current node error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
// 公共队列
|
||||
queuePub := "tasks:public"
|
||||
|
||||
// 节点队列
|
||||
queueCur := "tasks:node:" + node.Id.Hex()
|
||||
|
||||
// 节点队列任务
|
||||
var msg string
|
||||
msg, err = database.RedisClient.LPop(queueCur)
|
||||
if err != nil {
|
||||
if msg == "" {
|
||||
// 节点队列没有任务,获取公共队列任务
|
||||
msg, err = database.RedisClient.LPop(queuePub)
|
||||
if err != nil {
|
||||
if msg == "" {
|
||||
// 公共队列没有任务
|
||||
log.Debugf(GetWorkerPrefix(id) + "没有任务...")
|
||||
return
|
||||
} else {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
if msg, err = database.RedisClient.LPop(queueCur); err != nil {
|
||||
// 节点队列没有任务,获取公共队列任务
|
||||
queuePub := "tasks:public"
|
||||
if msg, err = database.RedisClient.LPop(queuePub); err != nil {
|
||||
}
|
||||
}
|
||||
|
||||
if msg == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// 反序列化
|
||||
tMsg := TaskMessage{}
|
||||
if err := json.Unmarshal([]byte(msg), &tMsg); err != nil {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
debug.PrintStack()
|
||||
log.Errorf("json string to struct error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 获取任务
|
||||
t, err := model.GetTask(tMsg.Id)
|
||||
if err != nil {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
log.Errorf("execute task, get task error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 获取爬虫
|
||||
spider, err := t.GetSpider()
|
||||
if err != nil {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
log.Errorf("execute task, get spider error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 创建日志目录
|
||||
fileDir, err := MakeLogDir(t)
|
||||
if err != nil {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
var fileDir string
|
||||
if fileDir, err = MakeLogDir(t); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取日志文件路径
|
||||
t.LogPath = GetLogFilePaths(fileDir)
|
||||
|
||||
// 创建日志目录文件夹
|
||||
fileStdoutDir := filepath.Dir(t.LogPath)
|
||||
if !utils.Exists(fileStdoutDir) {
|
||||
if err := os.MkdirAll(fileStdoutDir, os.ModePerm); err != nil {
|
||||
log.Errorf(GetWorkerPrefix(id) + err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 工作目录
|
||||
cwd := filepath.Join(
|
||||
viper.GetString("spider.path"),
|
||||
@@ -432,33 +446,29 @@ func ExecuteTask(id int) {
|
||||
|
||||
func GetTaskLog(id string) (logStr string, err error) {
|
||||
task, err := model.GetTask(id)
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
return
|
||||
}
|
||||
|
||||
logStr = ""
|
||||
if IsMasterNode(task.NodeId.Hex()) {
|
||||
// 若为主节点,获取本机日志
|
||||
logBytes, err := model.GetLocalLog(task.LogPath)
|
||||
logStr = utils.BytesToString(logBytes)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
logStr = err.Error()
|
||||
// return "", err
|
||||
} else {
|
||||
logStr = utils.BytesToString(logBytes)
|
||||
}
|
||||
|
||||
} else {
|
||||
// 若不为主节点,获取远端日志
|
||||
logStr, err = GetRemoteLog(task)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return "", err
|
||||
}
|
||||
return logStr, err
|
||||
}
|
||||
// 若不为主节点,获取远端日志
|
||||
logStr, err = GetRemoteLog(task)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
|
||||
return logStr, nil
|
||||
}
|
||||
return logStr, err
|
||||
}
|
||||
|
||||
func CancelTask(id string) (err error) {
|
||||
|
||||
@@ -17,9 +17,7 @@ func InitUserService() error {
|
||||
Password: utils.EncryptPassword("admin"),
|
||||
Role: constants.RoleAdmin,
|
||||
}
|
||||
if err := adminUser.Add(); err != nil {
|
||||
// pass
|
||||
}
|
||||
_ = adminUser.Add()
|
||||
return nil
|
||||
}
|
||||
func MakeToken(user *model.User) (tokenStr string, err error) {
|
||||
|
||||
@@ -21,7 +21,7 @@ func RemoveFiles(path string) {
|
||||
// 读取文件一行
|
||||
func ReadFileOneLine(fileName string) string {
|
||||
file := OpenFile(fileName)
|
||||
defer file.Close()
|
||||
defer Close(file)
|
||||
buf := bufio.NewReader(file)
|
||||
line, err := buf.ReadString('\n')
|
||||
if err != nil {
|
||||
@@ -57,10 +57,7 @@ func CreateFilePath(filePath string) {
|
||||
func Exists(path string) bool {
|
||||
_, err := os.Stat(path) //os.Stat获取文件信息
|
||||
if err != nil {
|
||||
if os.IsExist(err) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
return os.IsExist(err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -88,7 +85,7 @@ func DeCompressByPath(tarFile, dest string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer srcFile.Close()
|
||||
defer Close(srcFile)
|
||||
return DeCompress(srcFile, dest)
|
||||
}
|
||||
|
||||
@@ -112,7 +109,7 @@ func DeCompress(srcFile *os.File, dstPath string) error {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
defer zipFile.Close()
|
||||
defer Close(zipFile)
|
||||
|
||||
// 遍历zip内所有文件和目录
|
||||
for _, innerFile := range zipFile.File {
|
||||
@@ -156,7 +153,7 @@ func DeCompress(srcFile *os.File, dstPath string) error {
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
defer newFile.Close()
|
||||
defer Close(newFile)
|
||||
|
||||
// 拷贝该文件到新文件中
|
||||
if _, err := io.Copy(newFile, srcFile); err != nil {
|
||||
@@ -184,9 +181,9 @@ func DeCompress(srcFile *os.File, dstPath string) error {
|
||||
//dest 压缩文件存放地址
|
||||
func Compress(files []*os.File, dest string) error {
|
||||
d, _ := os.Create(dest)
|
||||
defer d.Close()
|
||||
defer Close(d)
|
||||
w := zip.NewWriter(d)
|
||||
defer w.Close()
|
||||
defer Close(w)
|
||||
for _, file := range files {
|
||||
err := _Compress(file, "", w)
|
||||
if err != nil {
|
||||
@@ -234,7 +231,7 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(writer, file)
|
||||
file.Close()
|
||||
Close(file)
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
|
||||
@@ -60,7 +60,7 @@ func TestCompress(t *testing.T) {
|
||||
So(er, ShouldEqual, nil)
|
||||
})
|
||||
})
|
||||
os.RemoveAll("testCompress")
|
||||
_ = os.RemoveAll("testCompress")
|
||||
|
||||
}
|
||||
func Zip(zipFile string, fileList []string) error {
|
||||
@@ -69,16 +69,11 @@ func Zip(zipFile string, fileList []string) error {
|
||||
if err != nil {
|
||||
log.Fatal()
|
||||
}
|
||||
defer fw.Close()
|
||||
defer Close(fw)
|
||||
|
||||
// 实例化新的 zip.Writer
|
||||
zw := zip.NewWriter(fw)
|
||||
defer func() {
|
||||
// 检测一下是否成功关闭
|
||||
if err := zw.Close(); err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
}()
|
||||
defer Close(zw)
|
||||
|
||||
for _, fileName := range fileList {
|
||||
fr, err := os.Open(fileName)
|
||||
@@ -91,6 +86,9 @@ func Zip(zipFile string, fileList []string) error {
|
||||
}
|
||||
// 写入文件的头信息
|
||||
fh, err := zip.FileInfoHeader(fi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w, err := zw.CreateHeader(fh)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -106,6 +104,10 @@ func Zip(zipFile string, fileList []string) error {
|
||||
|
||||
func TestDeCompress(t *testing.T) {
|
||||
err := os.Mkdir("testDeCompress", os.ModePerm)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
||||
}
|
||||
err = Zip("demo.zip", []string{})
|
||||
if err != nil {
|
||||
t.Error("create zip file failed")
|
||||
@@ -121,7 +123,7 @@ func TestDeCompress(t *testing.T) {
|
||||
err := DeCompress(tmpFile, dstPath)
|
||||
So(err, ShouldEqual, nil)
|
||||
})
|
||||
os.RemoveAll("testDeCompress")
|
||||
os.Remove("demo.zip")
|
||||
_ = os.RemoveAll("testDeCompress")
|
||||
_ = os.Remove("demo.zip")
|
||||
|
||||
}
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crawlab/database"
|
||||
"crawlab/entity"
|
||||
"encoding/json"
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"io"
|
||||
"runtime/debug"
|
||||
"unsafe"
|
||||
)
|
||||
@@ -35,21 +34,9 @@ func GetMessage(message redis.Message) *entity.NodeMessage {
|
||||
return &msg
|
||||
}
|
||||
|
||||
func Pub(channel string, msg entity.NodeMessage) error {
|
||||
if _, err := database.RedisClient.Publish(channel, GetJson(msg)); err != nil {
|
||||
log.Errorf("publish redis error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
func Close(c io.Closer) {
|
||||
err := c.Close()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("关闭资源文件失败。")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Sub(channel string, consume database.ConsumeFunc) error {
|
||||
ctx := context.Background()
|
||||
if err := database.RedisClient.Subscribe(ctx, consume, channel); err != nil {
|
||||
log.Errorf("subscribe redis error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -12,15 +12,16 @@ func IsObjectIdNull(id bson.ObjectId) bool {
|
||||
}
|
||||
|
||||
func InterfaceToString(value interface{}) string {
|
||||
switch value.(type) {
|
||||
switch realValue := value.(type) {
|
||||
case bson.ObjectId:
|
||||
return value.(bson.ObjectId).Hex()
|
||||
return realValue.Hex()
|
||||
case string:
|
||||
return value.(string)
|
||||
return realValue
|
||||
case int:
|
||||
return strconv.Itoa(value.(int))
|
||||
return strconv.Itoa(realValue)
|
||||
case time.Time:
|
||||
return value.(time.Time).String()
|
||||
return realValue.String()
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
|
||||
func EncryptPassword(str string) string {
|
||||
w := md5.New()
|
||||
io.WriteString(w, str)
|
||||
_, _ = io.WriteString(w, str)
|
||||
md5str := fmt.Sprintf("%x", w.Sum(nil))
|
||||
return md5str
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ then
|
||||
:
|
||||
else
|
||||
jspath=`ls /app/dist/js/app.*.js`
|
||||
sed -i "s?localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath}
|
||||
sed -i "s?http://localhost:8000?${CRAWLAB_API_ADDRESS}?g" ${jspath}
|
||||
fi
|
||||
|
||||
# replace base url
|
||||
|
||||
@@ -9,9 +9,8 @@
|
||||
<el-form label-width="80px">
|
||||
<el-form-item :label="$t('Node')">
|
||||
<el-select v-model="nodeId">
|
||||
<el-option value="" :label="$t('All Nodes')"/>
|
||||
<el-option
|
||||
v-for="op in $store.state.node.nodeList"
|
||||
v-for="op in nodeList"
|
||||
:key="op._id"
|
||||
:value="op._id"
|
||||
:disabled="op.status !== 'online'"
|
||||
@@ -31,6 +30,7 @@
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from '../../api/request'
|
||||
export default {
|
||||
name: 'CrawlConfirmDialog',
|
||||
props: {
|
||||
@@ -46,7 +46,8 @@ export default {
|
||||
data () {
|
||||
return {
|
||||
nodeId: '',
|
||||
param: ''
|
||||
param: '',
|
||||
nodeList: []
|
||||
}
|
||||
},
|
||||
methods: {
|
||||
@@ -61,6 +62,20 @@ export default {
|
||||
this.$emit('close')
|
||||
this.$st.sendEv('爬虫', '运行')
|
||||
}
|
||||
},
|
||||
created () {
|
||||
// 节点列表
|
||||
request.get('/nodes', {}).then(response => {
|
||||
this.nodeList = response.data.data.map(d => {
|
||||
d.systemInfo = {
|
||||
os: '',
|
||||
arch: '',
|
||||
num_cpu: '',
|
||||
executables: []
|
||||
}
|
||||
return d
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
</script>
|
||||
|
||||
@@ -14,9 +14,9 @@
|
||||
<el-form-item :label="$t('Schedule Name')" prop="name" required>
|
||||
<el-input v-model="scheduleForm.name" :placeholder="$t('Schedule Name')"></el-input>
|
||||
</el-form-item>
|
||||
<el-form-item :label="$t('Node')" prop="node_id">
|
||||
<el-form-item :label="$t('Node')" prop="node_id" required>
|
||||
<el-select v-model="scheduleForm.node_id">
|
||||
<el-option :label="$t('All Nodes')" value="000000000000000000000000"></el-option>
|
||||
<!--<el-option :label="$t('All Nodes')" value="000000000000000000000000"></el-option>-->
|
||||
<el-option
|
||||
v-for="op in nodeList"
|
||||
:key="op._id"
|
||||
@@ -38,21 +38,22 @@
|
||||
</el-option>
|
||||
</el-select>
|
||||
</el-form-item>
|
||||
<el-form-item :label="$t('schedules.cron')" prop="cron" :rules="cronRules" required>
|
||||
<template slot="label">
|
||||
<el-tooltip :content="$t('schedules.cron_format')"
|
||||
placement="top">
|
||||
<span>
|
||||
{{$t('schedules.cron')}}
|
||||
<i class="fa fa-exclamation-circle"></i>
|
||||
</span>
|
||||
</el-tooltip>
|
||||
</template>
|
||||
<el-input style="width:calc(100% - 100px);padding-right:10px"
|
||||
<!--:rules="cronRules"-->
|
||||
<el-form-item :label="$t('schedules.cron')" prop="cron" required>
|
||||
<!--<template slot="label">-->
|
||||
<!--<el-tooltip :content="$t('schedules.cron_format')"-->
|
||||
<!--placement="top">-->
|
||||
<!--<span>-->
|
||||
<!--{{$t('schedules.cron')}}-->
|
||||
<!--<i class="fa fa-exclamation-circle"></i>-->
|
||||
<!--</span>-->
|
||||
<!--</el-tooltip>-->
|
||||
<!--</template>-->
|
||||
<el-input style="padding-right:10px"
|
||||
v-model="scheduleForm.cron"
|
||||
:placeholder="$t('schedules.cron')">
|
||||
</el-input>
|
||||
<el-button size="small" style="width:100px" type="primary" @click="onShowCronDialog">{{$t('schedules.add_cron')}}</el-button>
|
||||
<!--<el-button size="small" style="width:100px" type="primary" @click="onShowCronDialog">{{$t('schedules.add_cron')}}</el-button>-->
|
||||
</el-form-item>
|
||||
<el-form-item :label="$t('Execute Command')" prop="params">
|
||||
<el-input v-model="spider.cmd"
|
||||
@@ -69,6 +70,7 @@
|
||||
:placeholder="$t('Schedule Description')"></el-input>
|
||||
</el-form-item>
|
||||
</el-form>
|
||||
<!--取消、保存-->
|
||||
<span slot="footer" class="dialog-footer">
|
||||
<el-button size="small" @click="onCancel">{{$t('Cancel')}}</el-button>
|
||||
<el-button size="small" type="primary" @click="onAddSubmit">{{$t('Submit')}}</el-button>
|
||||
@@ -76,9 +78,9 @@
|
||||
</el-dialog>
|
||||
|
||||
<!--cron generation popup-->
|
||||
<el-dialog title="生成 Cron" :visible.sync="showCron">
|
||||
<vcrontab @hide="showCron=false" @fill="onCrontabFill" :expression="expression"></vcrontab>
|
||||
</el-dialog>
|
||||
<!--<el-dialog title="生成 Cron" :visible.sync="showCron">-->
|
||||
<!--<vcrontab @hide="showCron=false" @fill="onCrontabFill" :expression="expression"></vcrontab>-->
|
||||
<!--</el-dialog>-->
|
||||
|
||||
<el-card style="border-radius: 0">
|
||||
<!--filter-->
|
||||
@@ -131,28 +133,15 @@
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import vcrontab from 'vcrontab'
|
||||
// import vcrontab from 'vcrontab'
|
||||
import request from '../../api/request'
|
||||
import {
|
||||
mapState
|
||||
} from 'vuex'
|
||||
|
||||
export default {
|
||||
name: 'ScheduleList',
|
||||
components: { vcrontab },
|
||||
data () {
|
||||
const cronValidator = (rule, value, callback) => {
|
||||
let patArr = []
|
||||
for (let i = 0; i < 6; i++) {
|
||||
patArr.push('[/*,0-9-]+')
|
||||
}
|
||||
const pat = '^' + patArr.join(' ') + '( [/*,0-9-]+)?' + '$'
|
||||
if (!value) {
|
||||
callback(new Error('cron cannot be empty'))
|
||||
} else if (!value.match(pat)) {
|
||||
callback(new Error('cron format is invalid'))
|
||||
}
|
||||
callback()
|
||||
}
|
||||
return {
|
||||
columns: [
|
||||
{ name: 'name', label: 'Name', width: '180' },
|
||||
@@ -165,11 +154,10 @@ export default {
|
||||
isEdit: false,
|
||||
dialogTitle: '',
|
||||
dialogVisible: false,
|
||||
cronRules: [
|
||||
{ validator: cronValidator, trigger: 'blur' }
|
||||
],
|
||||
showCron: false,
|
||||
expression: ''
|
||||
expression: '',
|
||||
spiderList: [],
|
||||
nodeList: []
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
@@ -177,12 +165,6 @@ export default {
|
||||
'scheduleList',
|
||||
'scheduleForm'
|
||||
]),
|
||||
...mapState('spider', [
|
||||
'spiderList'
|
||||
]),
|
||||
...mapState('node', [
|
||||
'nodeList'
|
||||
]),
|
||||
filteredTableData () {
|
||||
return this.scheduleList
|
||||
},
|
||||
@@ -211,19 +193,25 @@ export default {
|
||||
onAddSubmit () {
|
||||
this.$refs.scheduleForm.validate(res => {
|
||||
if (res) {
|
||||
let action
|
||||
if (this.isEdit) {
|
||||
action = 'editSchedule'
|
||||
} else {
|
||||
action = 'addSchedule'
|
||||
}
|
||||
this.$store.dispatch('schedule/' + action, this.scheduleForm._id)
|
||||
.then(() => {
|
||||
request.post(`/schedules/${this.scheduleForm._id}`, this.scheduleForm).then(response => {
|
||||
if (response.data.error) {
|
||||
this.$message.error(response.data.error)
|
||||
return
|
||||
}
|
||||
this.dialogVisible = false
|
||||
setTimeout(() => {
|
||||
this.$store.dispatch('schedule/getScheduleList')
|
||||
}, 100)
|
||||
this.$store.dispatch('schedule/getScheduleList')
|
||||
})
|
||||
} else {
|
||||
request.put('/schedules', this.scheduleForm).then(response => {
|
||||
if (response.data.error) {
|
||||
this.$message.error(response.data.error)
|
||||
return
|
||||
}
|
||||
this.dialogVisible = false
|
||||
this.$store.dispatch('schedule/getScheduleList')
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
this.$st.sendEv('定时任务', '提交')
|
||||
@@ -269,8 +257,25 @@ export default {
|
||||
},
|
||||
created () {
|
||||
this.$store.dispatch('schedule/getScheduleList')
|
||||
// this.$store.dispatch('spider/getSpiderList')
|
||||
this.$store.dispatch('node/getNodeList')
|
||||
|
||||
// 节点列表
|
||||
request.get('/nodes', {}).then(response => {
|
||||
this.nodeList = response.data.data.map(d => {
|
||||
d.systemInfo = {
|
||||
os: '',
|
||||
arch: '',
|
||||
num_cpu: '',
|
||||
executables: []
|
||||
}
|
||||
return d
|
||||
})
|
||||
})
|
||||
|
||||
// 爬虫列表
|
||||
request.get('/spiders', {})
|
||||
.then(response => {
|
||||
this.spiderList = response.data.data.list
|
||||
})
|
||||
}
|
||||
}
|
||||
</script>
|
||||
|
||||
@@ -424,12 +424,10 @@ export default {
|
||||
this.dialogVisible = true
|
||||
},
|
||||
isShowRun (row) {
|
||||
if (this.isCustomized(row)) {
|
||||
// customized spider
|
||||
return !!row.cmd
|
||||
if (row.cmd) {
|
||||
return true
|
||||
} else {
|
||||
// configurable spider
|
||||
return !!row.fields
|
||||
return false
|
||||
}
|
||||
},
|
||||
isCustomized (row) {
|
||||
|
||||
@@ -35,6 +35,7 @@ import {
|
||||
import TaskOverview from '../../components/Overview/TaskOverview'
|
||||
import GeneralTableView from '../../components/TableView/GeneralTableView'
|
||||
import LogView from '../../components/ScrollView/LogView'
|
||||
import request from '../../api/request'
|
||||
|
||||
export default {
|
||||
name: 'TaskDetail',
|
||||
@@ -46,12 +47,12 @@ export default {
|
||||
data () {
|
||||
return {
|
||||
activeTabName: 'overview',
|
||||
handle: undefined
|
||||
handle: undefined,
|
||||
taskLog: ''
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
...mapState('task', [
|
||||
'taskLog',
|
||||
'taskResultsData',
|
||||
'taskResultsTotalCount'
|
||||
]),
|
||||
@@ -97,18 +98,23 @@ export default {
|
||||
downloadCSV () {
|
||||
this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id)
|
||||
this.$st.sendEv('任务详情-结果', '下载CSV')
|
||||
},
|
||||
getTaskLog () {
|
||||
if (this.$route.params.id) {
|
||||
request.get(`/tasks/${this.$route.params.id}/log`).then(response => {
|
||||
this.taskLog = response.data.data
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
async created () {
|
||||
await this.$store.dispatch('task/getTaskData', this.$route.params.id)
|
||||
this.$store.dispatch('task/getTaskLog', this.$route.params.id)
|
||||
created () {
|
||||
this.$store.dispatch('task/getTaskData', this.$route.params.id)
|
||||
this.$store.dispatch('task/getTaskResults', this.$route.params.id)
|
||||
|
||||
if (this.taskForm && ['running'].includes(this.taskForm.status)) {
|
||||
this.handle = setInterval(() => {
|
||||
this.$store.dispatch('task/getTaskLog', this.$route.params.id)
|
||||
}, 5000)
|
||||
}
|
||||
this.getTaskLog()
|
||||
this.handle = setInterval(() => {
|
||||
this.getTaskLog()
|
||||
}, 5000)
|
||||
},
|
||||
destroyed () {
|
||||
clearInterval(this.handle)
|
||||
|
||||
Reference in New Issue
Block a user