diff --git a/core/controllers/database_v2.go b/core/controllers/database_v2.go deleted file mode 100644 index 771cee8d..00000000 --- a/core/controllers/database_v2.go +++ /dev/null @@ -1,117 +0,0 @@ -package controllers - -import ( - "github.com/crawlab-team/crawlab/core/ds" - "github.com/crawlab-team/crawlab/core/errors" - "github.com/crawlab-team/crawlab/core/models/models/v2" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/gin-gonic/gin" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -func PostDatabase(c *gin.Context) { - // data source - var payload struct { - Name string `json:"name"` - Type string `json:"type"` - Description string `json:"description"` - Host string `json:"host"` - Port int `json:"port"` - Url string `json:"url"` - Hosts []string `json:"hosts"` - Database string `json:"database"` - Username string `json:"username"` - Password string `json:"-,omitempty"` - ConnectType string `json:"connect_type"` - Status string `json:"status"` - Error string `json:"error"` - } - if err := c.ShouldBindJSON(&payload); err != nil { - HandleErrorBadRequest(c, err) - return - } - - u := GetUserFromContextV2(c) - - // add data source to db - dataSource := models.DatabaseV2{ - Name: payload.Name, - Type: payload.Type, - Description: payload.Description, - Host: payload.Host, - Port: payload.Port, - URI: payload.Url, - Database: payload.Database, - Username: payload.Username, - Password: payload.Password, - ConnectType: payload.ConnectType, - Status: payload.Status, - Error: payload.Error, - } - dataSource.SetCreated(u.Id) - dataSource.SetUpdated(u.Id) - id, err := service.NewModelServiceV2[models.DatabaseV2]().InsertOne(dataSource) - if err != nil { - HandleErrorInternalServerError(c, err) - return - } - dataSource.Id = id - - // check data source status - go func() { - _ = ds.GetDataSourceServiceV2().CheckStatus(id) - }() - - HandleSuccessWithData(c, dataSource) -} - -func PutDatabaseById(c *gin.Context) { - id, err := primitive.ObjectIDFromHex(c.Param("id")) - if err != nil { - HandleErrorInternalServerError(c, err) - return - } - - // data source - var dataSource models.DatabaseV2 - if err := c.ShouldBindJSON(&dataSource); err != nil { - HandleErrorBadRequest(c, err) - return - } - - err = service.NewModelServiceV2[models.DatabaseV2]().ReplaceById(id, dataSource) - if err != nil { - HandleErrorInternalServerError(c, err) - return - } - - // check data source status - go func() { - _ = ds.GetDataSourceServiceV2().CheckStatus(id) - }() -} - -func PostDatabaseChangePassword(c *gin.Context) { - id, err := primitive.ObjectIDFromHex(c.Param("id")) - if err != nil { - HandleErrorBadRequest(c, err) - return - } - var payload struct { - Password string `json:"password"` - } - if err := c.ShouldBindJSON(&payload); err != nil { - HandleErrorBadRequest(c, err) - return - } - if payload.Password == "" { - HandleErrorBadRequest(c, errors.ErrorDataSourceMissingRequiredFields) - return - } - u := GetUserFromContextV2(c) - if err := ds.GetDataSourceServiceV2().ChangePassword(id, payload.Password, u.Id); err != nil { - HandleErrorInternalServerError(c, err) - return - } - HandleSuccess(c) -} diff --git a/core/ds/service_v2.go b/core/ds/service_v2.go deleted file mode 100644 index fbf8fe24..00000000 --- a/core/ds/service_v2.go +++ /dev/null @@ -1,271 +0,0 @@ -package ds - -import ( - "github.com/apex/log" - "github.com/crawlab-team/crawlab/core/constants" - constants2 "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/models/models/v2" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/result" - "github.com/crawlab-team/crawlab/core/utils" - utils2 "github.com/crawlab-team/crawlab/core/utils" - "github.com/crawlab-team/crawlab/trace" - "go.mongodb.org/mongo-driver/bson/primitive" - "sync" - "time" -) - -type ServiceV2 struct { - // internals - timeout time.Duration - monitorInterval time.Duration - stopped bool -} - -func (svc *ServiceV2) Init() { - // result service registry - reg := result.GetResultServiceRegistry() - - // register result services - reg.Register(constants.DataSourceTypeMongo, NewDataSourceMongoService) - reg.Register(constants.DataSourceTypeMysql, NewDataSourceMysqlService) - reg.Register(constants.DataSourceTypePostgresql, NewDataSourcePostgresqlService) - reg.Register(constants.DataSourceTypeMssql, NewDataSourceMssqlService) - reg.Register(constants.DataSourceTypeSqlite, NewDataSourceSqliteService) - reg.Register(constants.DataSourceTypeCockroachdb, NewDataSourceCockroachdbService) - reg.Register(constants.DataSourceTypeElasticSearch, NewDataSourceElasticsearchService) - reg.Register(constants.DataSourceTypeKafka, NewDataSourceKafkaService) -} - -func (svc *ServiceV2) Start() { - // start monitoring - go svc.Monitor() -} - -func (svc *ServiceV2) Wait() { - utils.DefaultWait() -} - -func (svc *ServiceV2) Stop() { - svc.stopped = true -} - -func (svc *ServiceV2) ChangePassword(id primitive.ObjectID, password string, by primitive.ObjectID) (err error) { - dataSource, err := service.NewModelServiceV2[models.DatabaseV2]().GetById(id) - if err != nil { - return err - } - dataSource.Password, err = utils.EncryptAES(password) - if err != nil { - return err - } - dataSource.SetUpdated(by) - err = service.NewModelServiceV2[models.DatabaseV2]().ReplaceById(id, *dataSource) - if err != nil { - return err - } - return nil -} - -func (svc *ServiceV2) Monitor() { - for { - // return if stopped - if svc.stopped { - return - } - - // monitor - if err := svc.monitor(); err != nil { - trace.PrintError(err) - } - - // wait - time.Sleep(svc.monitorInterval) - } -} - -func (svc *ServiceV2) CheckStatus(id primitive.ObjectID) (err error) { - ds, err := service.NewModelServiceV2[models.DatabaseV2]().GetById(id) - if err != nil { - return err - } - return svc.checkStatus(ds, true) -} - -func (svc *ServiceV2) SetTimeout(duration time.Duration) { - svc.timeout = duration -} - -func (svc *ServiceV2) SetMonitorInterval(duration time.Duration) { - svc.monitorInterval = duration -} - -func (svc *ServiceV2) monitor() (err error) { - // start - tic := time.Now() - log.Debugf("[DataSourceService] start monitoring") - - // data source list - dataSources, err := service.NewModelServiceV2[models.DatabaseV2]().GetMany(nil, nil) - if err != nil { - return err - } - - // waiting group - wg := sync.WaitGroup{} - wg.Add(len(dataSources)) - - // iterate data source list - for _, ds := range dataSources { - // async operation - go func(ds *models.DatabaseV2) { - // check status and save - _ = svc.checkStatus(ds, true) - - // release - wg.Done() - }(&ds) - } - - // wait - wg.Wait() - - // finish - toc := time.Now() - log.Debugf("[DataSourceService] finished monitoring. elapsed: %d ms", (toc.Sub(tic)).Milliseconds()) - - return nil -} - -func (svc *ServiceV2) checkStatus(ds *models.DatabaseV2, save bool) (err error) { - // check status - if err := svc._checkStatus(ds); err != nil { - ds.Status = constants2.DataSourceStatusOffline - ds.Error = err.Error() - } else { - ds.Status = constants2.DataSourceStatusOnline - ds.Error = "" - } - - // save - if save { - return svc._save(ds) - } - - return nil -} - -func (svc *ServiceV2) _save(ds *models.DatabaseV2) (err error) { - log.Debugf("[DataSourceService] saving data source: name=%s, type=%s, status=%s, error=%s", ds.Name, ds.Type, ds.Status, ds.Error) - return service.NewModelServiceV2[models.DatabaseV2]().ReplaceById(ds.Id, *ds) -} - -func (svc *ServiceV2) _checkStatus(ds *models.DatabaseV2) (err error) { - switch ds.Type { - case constants.DataSourceTypeMongo: - _, err := utils2.GetMongoClientWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - case constants.DataSourceTypeMysql: - s, err := utils2.GetMysqlSessionWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - if s != nil { - err := s.Close() - if err != nil { - return err - } - } - case constants.DataSourceTypePostgresql: - s, err := utils2.GetPostgresqlSessionWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - if s != nil { - err := s.Close() - if err != nil { - return err - } - } - case constants.DataSourceTypeMssql: - s, err := utils2.GetMssqlSessionWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - if s != nil { - err := s.Close() - if err != nil { - return err - } - } - case constants.DataSourceTypeSqlite: - s, err := utils2.GetSqliteSessionWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - if s != nil { - err := s.Close() - if err != nil { - return err - } - } - case constants.DataSourceTypeCockroachdb: - s, err := utils2.GetCockroachdbSessionWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - if s != nil { - err := s.Close() - if err != nil { - return err - } - } - case constants.DataSourceTypeElasticSearch: - _, err := utils2.GetElasticsearchClientWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - case constants.DataSourceTypeKafka: - c, err := utils2.GetKafkaConnectionWithTimeoutV2(ds, svc.timeout) - if err != nil { - return err - } - if c != nil { - err := c.Close() - if err != nil { - return err - } - } - default: - log.Warnf("[DataSourceService] invalid data source type: %s", ds.Type) - } - return nil -} - -func NewDataSourceServiceV2() *ServiceV2 { - // service - svc := &ServiceV2{ - monitorInterval: 15 * time.Second, - timeout: 10 * time.Second, - } - - // initialize - svc.Init() - - // start - svc.Start() - - return svc -} - -var _dsSvcV2 *ServiceV2 - -func GetDataSourceServiceV2() *ServiceV2 { - if _dsSvcV2 != nil { - return _dsSvcV2 - } - _dsSvcV2 = NewDataSourceServiceV2() - return _dsSvcV2 -} diff --git a/core/entity/database.go b/core/entity/database.go index e0033781..0dee0855 100644 --- a/core/entity/database.go +++ b/core/entity/database.go @@ -12,6 +12,7 @@ type Database struct { type DatabaseTable struct { Name string `json:"name"` Columns []DatabaseColumn `json:"columns"` + Indexes []DatabaseIndex `json:"indexes"` } type DatabaseColumn struct { @@ -23,3 +24,15 @@ type DatabaseColumn struct { Extra string `json:"extra,omitempty"` Children []DatabaseColumn `json:"children,omitempty"` } + +type DatabaseIndex struct { + Name string `json:"name"` + Type string `json:"type,omitempty"` + Columns []DatabaseIndexColumn `json:"columns"` + Unique bool `json:"unique"` +} + +type DatabaseIndexColumn struct { + Name string `json:"name"` + Order int `json:"order"` +} diff --git a/core/models/models/v2/database_v2.go b/core/models/models/v2/database_v2.go index 673bd92a..1d084a82 100644 --- a/core/models/models/v2/database_v2.go +++ b/core/models/models/v2/database_v2.go @@ -6,20 +6,19 @@ type DatabaseV2 struct { any `collection:"databases"` BaseModelV2[DatabaseV2] `bson:",inline"` Name string `json:"name" bson:"name"` - DataSource string `json:"data_source" bson:"data_source"` - Type string `json:"type" bson:"type"` Description string `json:"description" bson:"description"` + DataSource string `json:"data_source" bson:"data_source"` Host string `json:"host" bson:"host"` Port int `json:"port" bson:"port"` URI string `json:"uri,omitempty" bson:"uri,omitempty"` Database string `json:"database,omitempty" bson:"database,omitempty"` Username string `json:"username,omitempty" bson:"username,omitempty"` Password string `json:"-,omitempty" bson:"password,omitempty"` - ConnectType string `json:"connect_type,omitempty" bson:"connect_type,omitempty"` Status string `json:"status" bson:"status"` Error string `json:"error" bson:"error"` Active bool `json:"active" bson:"active"` ActiveAt time.Time `json:"active_ts" bson:"active_ts"` + IsDefault bool `json:"is_default" bson:"-"` MongoParams *struct { AuthSource string `json:"auth_source,omitempty" bson:"auth_source,omitempty"` @@ -36,11 +35,11 @@ type DatabaseV2 struct { } `json:"snowflake_params,omitempty" bson:"snowflake_params,omitempty"` CassandraParams *struct { Keyspace string `json:"keyspace,omitempty" bson:"keyspace,omitempty"` - } + } `json:"cassandra_params,omitempty" bson:"cassandra_params,omitempty"` HiveParams *struct { Auth string `json:"auth,omitempty" bson:"auth,omitempty"` - } + } `json:"hive_params,omitempty" bson:"hive_params,omitempty"` RedisParams *struct { DB int `json:"db,omitempty" bson:"db,omitempty"` - } + } `json:"redis_params,omitempty" bson:"redis_params,omitempty"` }