mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
🚀 feat: add IsDefault field to DatabaseV2 model
🔨 refactor: modify GetDatabaseList, GetDatabaseById, PostDatabaseTestConnection, and GetDatabaseMetadata functions 🐞 fix: GetDatabaseById and GetMetadata functions in MongoService 🔧 chore: add GetDefaultDatabase and GetDatabaseById functions to utils file
This commit is contained in:
@@ -1,117 +0,0 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"github.com/crawlab-team/crawlab/core/ds"
|
||||
"github.com/crawlab-team/crawlab/core/errors"
|
||||
"github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
func PostDatabase(c *gin.Context) {
|
||||
// data source
|
||||
var payload struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Description string `json:"description"`
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
Url string `json:"url"`
|
||||
Hosts []string `json:"hosts"`
|
||||
Database string `json:"database"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"-,omitempty"`
|
||||
ConnectType string `json:"connect_type"`
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&payload); err != nil {
|
||||
HandleErrorBadRequest(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
u := GetUserFromContextV2(c)
|
||||
|
||||
// add data source to db
|
||||
dataSource := models.DatabaseV2{
|
||||
Name: payload.Name,
|
||||
Type: payload.Type,
|
||||
Description: payload.Description,
|
||||
Host: payload.Host,
|
||||
Port: payload.Port,
|
||||
URI: payload.Url,
|
||||
Database: payload.Database,
|
||||
Username: payload.Username,
|
||||
Password: payload.Password,
|
||||
ConnectType: payload.ConnectType,
|
||||
Status: payload.Status,
|
||||
Error: payload.Error,
|
||||
}
|
||||
dataSource.SetCreated(u.Id)
|
||||
dataSource.SetUpdated(u.Id)
|
||||
id, err := service.NewModelServiceV2[models.DatabaseV2]().InsertOne(dataSource)
|
||||
if err != nil {
|
||||
HandleErrorInternalServerError(c, err)
|
||||
return
|
||||
}
|
||||
dataSource.Id = id
|
||||
|
||||
// check data source status
|
||||
go func() {
|
||||
_ = ds.GetDataSourceServiceV2().CheckStatus(id)
|
||||
}()
|
||||
|
||||
HandleSuccessWithData(c, dataSource)
|
||||
}
|
||||
|
||||
func PutDatabaseById(c *gin.Context) {
|
||||
id, err := primitive.ObjectIDFromHex(c.Param("id"))
|
||||
if err != nil {
|
||||
HandleErrorInternalServerError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
// data source
|
||||
var dataSource models.DatabaseV2
|
||||
if err := c.ShouldBindJSON(&dataSource); err != nil {
|
||||
HandleErrorBadRequest(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = service.NewModelServiceV2[models.DatabaseV2]().ReplaceById(id, dataSource)
|
||||
if err != nil {
|
||||
HandleErrorInternalServerError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
// check data source status
|
||||
go func() {
|
||||
_ = ds.GetDataSourceServiceV2().CheckStatus(id)
|
||||
}()
|
||||
}
|
||||
|
||||
func PostDatabaseChangePassword(c *gin.Context) {
|
||||
id, err := primitive.ObjectIDFromHex(c.Param("id"))
|
||||
if err != nil {
|
||||
HandleErrorBadRequest(c, err)
|
||||
return
|
||||
}
|
||||
var payload struct {
|
||||
Password string `json:"password"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&payload); err != nil {
|
||||
HandleErrorBadRequest(c, err)
|
||||
return
|
||||
}
|
||||
if payload.Password == "" {
|
||||
HandleErrorBadRequest(c, errors.ErrorDataSourceMissingRequiredFields)
|
||||
return
|
||||
}
|
||||
u := GetUserFromContextV2(c)
|
||||
if err := ds.GetDataSourceServiceV2().ChangePassword(id, payload.Password, u.Id); err != nil {
|
||||
HandleErrorInternalServerError(c, err)
|
||||
return
|
||||
}
|
||||
HandleSuccess(c)
|
||||
}
|
||||
@@ -1,271 +0,0 @@
|
||||
package ds
|
||||
|
||||
import (
|
||||
"github.com/apex/log"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
constants2 "github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/models/models/v2"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
"github.com/crawlab-team/crawlab/core/result"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
utils2 "github.com/crawlab-team/crawlab/core/utils"
|
||||
"github.com/crawlab-team/crawlab/trace"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ServiceV2 struct {
|
||||
// internals
|
||||
timeout time.Duration
|
||||
monitorInterval time.Duration
|
||||
stopped bool
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Init() {
|
||||
// result service registry
|
||||
reg := result.GetResultServiceRegistry()
|
||||
|
||||
// register result services
|
||||
reg.Register(constants.DataSourceTypeMongo, NewDataSourceMongoService)
|
||||
reg.Register(constants.DataSourceTypeMysql, NewDataSourceMysqlService)
|
||||
reg.Register(constants.DataSourceTypePostgresql, NewDataSourcePostgresqlService)
|
||||
reg.Register(constants.DataSourceTypeMssql, NewDataSourceMssqlService)
|
||||
reg.Register(constants.DataSourceTypeSqlite, NewDataSourceSqliteService)
|
||||
reg.Register(constants.DataSourceTypeCockroachdb, NewDataSourceCockroachdbService)
|
||||
reg.Register(constants.DataSourceTypeElasticSearch, NewDataSourceElasticsearchService)
|
||||
reg.Register(constants.DataSourceTypeKafka, NewDataSourceKafkaService)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Start() {
|
||||
// start monitoring
|
||||
go svc.Monitor()
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Wait() {
|
||||
utils.DefaultWait()
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Stop() {
|
||||
svc.stopped = true
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) ChangePassword(id primitive.ObjectID, password string, by primitive.ObjectID) (err error) {
|
||||
dataSource, err := service.NewModelServiceV2[models.DatabaseV2]().GetById(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dataSource.Password, err = utils.EncryptAES(password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dataSource.SetUpdated(by)
|
||||
err = service.NewModelServiceV2[models.DatabaseV2]().ReplaceById(id, *dataSource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Monitor() {
|
||||
for {
|
||||
// return if stopped
|
||||
if svc.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
// monitor
|
||||
if err := svc.monitor(); err != nil {
|
||||
trace.PrintError(err)
|
||||
}
|
||||
|
||||
// wait
|
||||
time.Sleep(svc.monitorInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) CheckStatus(id primitive.ObjectID) (err error) {
|
||||
ds, err := service.NewModelServiceV2[models.DatabaseV2]().GetById(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return svc.checkStatus(ds, true)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) SetTimeout(duration time.Duration) {
|
||||
svc.timeout = duration
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) SetMonitorInterval(duration time.Duration) {
|
||||
svc.monitorInterval = duration
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) monitor() (err error) {
|
||||
// start
|
||||
tic := time.Now()
|
||||
log.Debugf("[DataSourceService] start monitoring")
|
||||
|
||||
// data source list
|
||||
dataSources, err := service.NewModelServiceV2[models.DatabaseV2]().GetMany(nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// waiting group
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(dataSources))
|
||||
|
||||
// iterate data source list
|
||||
for _, ds := range dataSources {
|
||||
// async operation
|
||||
go func(ds *models.DatabaseV2) {
|
||||
// check status and save
|
||||
_ = svc.checkStatus(ds, true)
|
||||
|
||||
// release
|
||||
wg.Done()
|
||||
}(&ds)
|
||||
}
|
||||
|
||||
// wait
|
||||
wg.Wait()
|
||||
|
||||
// finish
|
||||
toc := time.Now()
|
||||
log.Debugf("[DataSourceService] finished monitoring. elapsed: %d ms", (toc.Sub(tic)).Milliseconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) checkStatus(ds *models.DatabaseV2, save bool) (err error) {
|
||||
// check status
|
||||
if err := svc._checkStatus(ds); err != nil {
|
||||
ds.Status = constants2.DataSourceStatusOffline
|
||||
ds.Error = err.Error()
|
||||
} else {
|
||||
ds.Status = constants2.DataSourceStatusOnline
|
||||
ds.Error = ""
|
||||
}
|
||||
|
||||
// save
|
||||
if save {
|
||||
return svc._save(ds)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) _save(ds *models.DatabaseV2) (err error) {
|
||||
log.Debugf("[DataSourceService] saving data source: name=%s, type=%s, status=%s, error=%s", ds.Name, ds.Type, ds.Status, ds.Error)
|
||||
return service.NewModelServiceV2[models.DatabaseV2]().ReplaceById(ds.Id, *ds)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) _checkStatus(ds *models.DatabaseV2) (err error) {
|
||||
switch ds.Type {
|
||||
case constants.DataSourceTypeMongo:
|
||||
_, err := utils2.GetMongoClientWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case constants.DataSourceTypeMysql:
|
||||
s, err := utils2.GetMysqlSessionWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s != nil {
|
||||
err := s.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case constants.DataSourceTypePostgresql:
|
||||
s, err := utils2.GetPostgresqlSessionWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s != nil {
|
||||
err := s.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case constants.DataSourceTypeMssql:
|
||||
s, err := utils2.GetMssqlSessionWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s != nil {
|
||||
err := s.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case constants.DataSourceTypeSqlite:
|
||||
s, err := utils2.GetSqliteSessionWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s != nil {
|
||||
err := s.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case constants.DataSourceTypeCockroachdb:
|
||||
s, err := utils2.GetCockroachdbSessionWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s != nil {
|
||||
err := s.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case constants.DataSourceTypeElasticSearch:
|
||||
_, err := utils2.GetElasticsearchClientWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case constants.DataSourceTypeKafka:
|
||||
c, err := utils2.GetKafkaConnectionWithTimeoutV2(ds, svc.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if c != nil {
|
||||
err := c.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
default:
|
||||
log.Warnf("[DataSourceService] invalid data source type: %s", ds.Type)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewDataSourceServiceV2() *ServiceV2 {
|
||||
// service
|
||||
svc := &ServiceV2{
|
||||
monitorInterval: 15 * time.Second,
|
||||
timeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
// initialize
|
||||
svc.Init()
|
||||
|
||||
// start
|
||||
svc.Start()
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
var _dsSvcV2 *ServiceV2
|
||||
|
||||
func GetDataSourceServiceV2() *ServiceV2 {
|
||||
if _dsSvcV2 != nil {
|
||||
return _dsSvcV2
|
||||
}
|
||||
_dsSvcV2 = NewDataSourceServiceV2()
|
||||
return _dsSvcV2
|
||||
}
|
||||
@@ -12,6 +12,7 @@ type Database struct {
|
||||
type DatabaseTable struct {
|
||||
Name string `json:"name"`
|
||||
Columns []DatabaseColumn `json:"columns"`
|
||||
Indexes []DatabaseIndex `json:"indexes"`
|
||||
}
|
||||
|
||||
type DatabaseColumn struct {
|
||||
@@ -23,3 +24,15 @@ type DatabaseColumn struct {
|
||||
Extra string `json:"extra,omitempty"`
|
||||
Children []DatabaseColumn `json:"children,omitempty"`
|
||||
}
|
||||
|
||||
type DatabaseIndex struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type,omitempty"`
|
||||
Columns []DatabaseIndexColumn `json:"columns"`
|
||||
Unique bool `json:"unique"`
|
||||
}
|
||||
|
||||
type DatabaseIndexColumn struct {
|
||||
Name string `json:"name"`
|
||||
Order int `json:"order"`
|
||||
}
|
||||
|
||||
@@ -6,20 +6,19 @@ type DatabaseV2 struct {
|
||||
any `collection:"databases"`
|
||||
BaseModelV2[DatabaseV2] `bson:",inline"`
|
||||
Name string `json:"name" bson:"name"`
|
||||
DataSource string `json:"data_source" bson:"data_source"`
|
||||
Type string `json:"type" bson:"type"`
|
||||
Description string `json:"description" bson:"description"`
|
||||
DataSource string `json:"data_source" bson:"data_source"`
|
||||
Host string `json:"host" bson:"host"`
|
||||
Port int `json:"port" bson:"port"`
|
||||
URI string `json:"uri,omitempty" bson:"uri,omitempty"`
|
||||
Database string `json:"database,omitempty" bson:"database,omitempty"`
|
||||
Username string `json:"username,omitempty" bson:"username,omitempty"`
|
||||
Password string `json:"-,omitempty" bson:"password,omitempty"`
|
||||
ConnectType string `json:"connect_type,omitempty" bson:"connect_type,omitempty"`
|
||||
Status string `json:"status" bson:"status"`
|
||||
Error string `json:"error" bson:"error"`
|
||||
Active bool `json:"active" bson:"active"`
|
||||
ActiveAt time.Time `json:"active_ts" bson:"active_ts"`
|
||||
IsDefault bool `json:"is_default" bson:"-"`
|
||||
|
||||
MongoParams *struct {
|
||||
AuthSource string `json:"auth_source,omitempty" bson:"auth_source,omitempty"`
|
||||
@@ -36,11 +35,11 @@ type DatabaseV2 struct {
|
||||
} `json:"snowflake_params,omitempty" bson:"snowflake_params,omitempty"`
|
||||
CassandraParams *struct {
|
||||
Keyspace string `json:"keyspace,omitempty" bson:"keyspace,omitempty"`
|
||||
}
|
||||
} `json:"cassandra_params,omitempty" bson:"cassandra_params,omitempty"`
|
||||
HiveParams *struct {
|
||||
Auth string `json:"auth,omitempty" bson:"auth,omitempty"`
|
||||
}
|
||||
} `json:"hive_params,omitempty" bson:"hive_params,omitempty"`
|
||||
RedisParams *struct {
|
||||
DB int `json:"db,omitempty" bson:"db,omitempty"`
|
||||
}
|
||||
} `json:"redis_params,omitempty" bson:"redis_params,omitempty"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user