diff --git a/backend/apps/api.go b/backend/apps/api.go index a650687f..58d271eb 100644 --- a/backend/apps/api.go +++ b/backend/apps/api.go @@ -33,7 +33,7 @@ func (app *Api) Init() { _ = initModule("redis", redis.InitRedis) // initialize model services - _ = initModule("mode-services", models.InitModelServices) + _ = initModule("model-services", models.InitModelServices) // initialize controllers _ = initModule("controllers", controllers.InitControllers) @@ -45,7 +45,7 @@ func (app *Api) Init() { _ = app.initModuleWithApp("routes", routes.InitRoutes) } -func (app *Api) Run() { +func (app *Api) Start() { host := viper.GetString("server.host") port := viper.GetString("server.port") address := net.JoinHostPort(host, port) @@ -62,9 +62,14 @@ func (app *Api) Run() { } } +func (app *Api) Wait() { + DefaultWait() +} + func (app *Api) Stop() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + if err := app.srv.Shutdown(ctx); err != nil { log.Error("run server error:" + err.Error()) } diff --git a/backend/apps/base.go b/backend/apps/base.go index 92770361..42fc1a99 100644 --- a/backend/apps/base.go +++ b/backend/apps/base.go @@ -2,6 +2,7 @@ package apps type App interface { Init() - Run() + Start() + Wait() Stop() } diff --git a/backend/apps/handler.go b/backend/apps/handler.go index 0701a105..80bdf033 100644 --- a/backend/apps/handler.go +++ b/backend/apps/handler.go @@ -2,6 +2,7 @@ package apps import ( "github.com/apex/log" + "github.com/crawlab-team/crawlab-core/config" "github.com/crawlab-team/crawlab-core/grpc" ) @@ -9,11 +10,18 @@ type Handler struct { } func (app *Handler) Init() { + // config + _ = initModule("config", config.InitConfig) + + // grpc _ = initModule("grpc", grpc.InitGrpcServices) } -func (app *Handler) Run() { - log.Info("handler has started") +func (app *Handler) Start() { +} + +func (app *Handler) Wait() { + DefaultWait() } func (app *Handler) Stop() { diff --git a/backend/apps/master.go b/backend/apps/master.go index bfbba2fa..c7257d92 100644 --- a/backend/apps/master.go +++ b/backend/apps/master.go @@ -1,23 +1,39 @@ package apps type Master struct { - api *Api + api *Api + scheduler *Scheduler + quit chan int } func (app *Master) Init() { - panic("implement me") + // api + initApp("api", app.api) + + // scheduler + initApp("scheduler", app.scheduler) } -func (app *Master) Run() { - panic("implement me") +func (app *Master) Start() { + go app.api.Start() + go app.scheduler.Start() +} + +func (app *Master) Wait() { + <-app.quit } func (app *Master) Stop() { - panic("implement me") + app.api.Stop() + app.scheduler.Stop() + + app.quit <- 1 } func NewMaster() *Master { return &Master{ - api: NewApi(), + api: NewApi(), + scheduler: NewScheduler(), + quit: make(chan int, 1), } } diff --git a/backend/apps/scheduler.go b/backend/apps/scheduler.go index 56938767..7d779fca 100644 --- a/backend/apps/scheduler.go +++ b/backend/apps/scheduler.go @@ -2,6 +2,7 @@ package apps import ( "github.com/apex/log" + "github.com/crawlab-team/crawlab-core/config" "github.com/crawlab-team/crawlab-core/grpc" ) @@ -9,13 +10,21 @@ type Scheduler struct { } func (app *Scheduler) Init() { + // config + _ = initModule("config", config.InitConfig) + + // grpc _ = initModule("grpc", grpc.InitGrpcServices) } -func (app *Scheduler) Run() { +func (app *Scheduler) Start() { log.Info("scheduler has started") } +func (app *Scheduler) Wait() { + DefaultWait() +} + func (app *Scheduler) Stop() { log.Info("scheduler has stopped") } diff --git a/backend/apps/utils.go b/backend/apps/utils.go index 17987882..20164f2c 100644 --- a/backend/apps/utils.go +++ b/backend/apps/utils.go @@ -15,12 +15,12 @@ func Start(app App) { func start(app App) { app.Init() - go app.Run() - waitForStop() + go app.Start() + app.Wait() app.Stop() } -func waitForStop() { +func DefaultWait() { quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit @@ -35,3 +35,10 @@ func initModule(name string, fn func() error) (err error) { log.Info(fmt.Sprintf("initialized %s successfully", name)) return nil } + +func initApp(name string, app App) { + _ = initModule(name, func() error { + app.Init() + return nil + }) +} diff --git a/backend/apps/worker.go b/backend/apps/worker.go index db1ccf09..add053e2 100644 --- a/backend/apps/worker.go +++ b/backend/apps/worker.go @@ -1,28 +1,31 @@ package apps -import ( - "github.com/apex/log" - "github.com/crawlab-team/crawlab-core/grpc" -) - type Worker struct { handler *Handler + quit chan int } func (app *Worker) Init() { - _ = initModule("grpc", grpc.InitGrpcServices) + initApp("handler", app.handler) } -func (app *Worker) Run() { - log.Info("worker has started") +func (app *Worker) Start() { + go app.handler.Start() +} + +func (app *Worker) Wait() { + <-app.quit } func (app *Worker) Stop() { - log.Info("worker has stopped") + app.handler.Stop() + + app.quit <- 1 } func NewWorker() *Worker { return &Worker{ handler: NewHandler(), + quit: make(chan int, 1), } } diff --git a/backend/cmd/scheduler.go b/backend/cmd/scheduler.go new file mode 100644 index 00000000..ae44a06a --- /dev/null +++ b/backend/cmd/scheduler.go @@ -0,0 +1,22 @@ +package cmd + +import ( + "crawlab/apps" + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(schedulerCmd) +} + +var schedulerCmd = &cobra.Command{ + Use: "scheduler", + Aliases: []string{"S"}, + Short: "Start scheduler", + Long: `Start a scheduler instance of Crawlab +which assigns tasks to worker nodes to execute`, + Run: func(cmd *cobra.Command, args []string) { + scheduler := apps.NewScheduler() + apps.Start(scheduler) + }, +}