mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-24 17:41:03 +01:00
feat: updated database related api
This commit is contained in:
@@ -3,6 +3,7 @@ package constants
|
||||
const (
|
||||
FilterQueryFieldConditions = "conditions"
|
||||
FilterQueryFieldAll = "all"
|
||||
FilterQueryFieldFilter = "filter"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -15,11 +15,6 @@ const (
|
||||
RunTypeSelectedNodes = "selected-nodes"
|
||||
)
|
||||
|
||||
const (
|
||||
TaskTypeSpider = "spider"
|
||||
TaskTypeSystem = "system"
|
||||
)
|
||||
|
||||
type TaskSignal int
|
||||
|
||||
const (
|
||||
@@ -30,10 +25,6 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
TaskListQueuePrefixPublic = "tasks:public"
|
||||
TaskListQueuePrefixNodes = "tasks:nodes"
|
||||
)
|
||||
|
||||
const (
|
||||
TaskKey = "_tid"
|
||||
TaskKey = "_tid"
|
||||
SpiderKey = "_sid"
|
||||
)
|
||||
|
||||
@@ -12,7 +12,7 @@ type DatabaseService interface {
|
||||
GetMetadataAllDb(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error)
|
||||
CreateDatabase(id primitive.ObjectID, databaseName string) (err error)
|
||||
DropDatabase(id primitive.ObjectID, databaseName string) (err error)
|
||||
GetTableMetadata(id primitive.ObjectID, databaseName, tableName string) (table *entity.DatabaseTable, err error)
|
||||
GetTableMetadata(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}) (table *entity.DatabaseTable, err error)
|
||||
CreateTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
|
||||
ModifyTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
|
||||
DropTable(id primitive.ObjectID, databaseName, tableName string) (err error)
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"github.com/crawlab-team/crawlab/core/task/stats"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
"github.com/crawlab-team/crawlab/db/mongo"
|
||||
grpc "github.com/crawlab-team/crawlab/grpc"
|
||||
"github.com/crawlab-team/crawlab/grpc"
|
||||
"github.com/crawlab-team/crawlab/trace"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
@@ -216,16 +216,6 @@ func (svr TaskServerV2) handleInsertData(msg *grpc.StreamMessage) (err error) {
|
||||
}
|
||||
var records []map[string]interface{}
|
||||
for _, d := range data.Records {
|
||||
res, ok := d[constants.TaskKey]
|
||||
if ok {
|
||||
switch res.(type) {
|
||||
case string:
|
||||
id, err := primitive.ObjectIDFromHex(res.(string))
|
||||
if err == nil {
|
||||
d[constants.TaskKey] = id
|
||||
}
|
||||
}
|
||||
}
|
||||
records = append(records, d)
|
||||
}
|
||||
return svr.statsSvc.InsertData(data.TaskId, records...)
|
||||
|
||||
@@ -2,6 +2,7 @@ package stats
|
||||
|
||||
import (
|
||||
log2 "github.com/apex/log"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/database"
|
||||
interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
|
||||
type databaseServiceItem struct {
|
||||
taskId primitive.ObjectID
|
||||
spiderId primitive.ObjectID
|
||||
dbId primitive.ObjectID
|
||||
dbSvc interfaces2.DatabaseService
|
||||
tableName string
|
||||
@@ -54,7 +56,7 @@ func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[strin
|
||||
tableName := item.tableName
|
||||
if utils.IsPro() && dbSvc != nil {
|
||||
for _, record := range records {
|
||||
if err := dbSvc.CreateRow(dbId, "", tableName, record); err != nil {
|
||||
if err := dbSvc.CreateRow(dbId, "", tableName, svc.normalizeRecord(item, record)); err != nil {
|
||||
log2.Errorf("failed to insert data: %v", err)
|
||||
continue
|
||||
}
|
||||
@@ -63,7 +65,7 @@ func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[strin
|
||||
} else {
|
||||
var records2 []interface{}
|
||||
for _, record := range records {
|
||||
records2 = append(records2, record)
|
||||
records2 = append(records2, svc.normalizeRecord(item, record))
|
||||
}
|
||||
_, err = mongo.GetMongoCol(tableName).InsertMany(records2)
|
||||
if err != nil {
|
||||
@@ -118,15 +120,19 @@ func (svc *ServiceV2) getDatabaseServiceItem(taskId primitive.ObjectID) (item *d
|
||||
}
|
||||
}
|
||||
|
||||
// store in cache
|
||||
svc.databaseServiceItems[taskId.Hex()] = &databaseServiceItem{
|
||||
// item
|
||||
item = &databaseServiceItem{
|
||||
taskId: taskId,
|
||||
spiderId: s.Id,
|
||||
dbId: s.DataSourceId,
|
||||
dbSvc: dbSvc,
|
||||
tableName: s.ColName,
|
||||
time: time.Now(),
|
||||
}
|
||||
|
||||
// store in cache
|
||||
svc.databaseServiceItems[taskId.Hex()] = item
|
||||
|
||||
return item, nil
|
||||
}
|
||||
|
||||
@@ -158,6 +164,18 @@ func (svc *ServiceV2) cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) normalizeRecord(item *databaseServiceItem, record map[string]interface{}) (res map[string]interface{}) {
|
||||
res = record
|
||||
|
||||
// set task id
|
||||
res[constants.TaskKey] = item.taskId
|
||||
|
||||
// set spider id
|
||||
res[constants.SpiderKey] = item.spiderId
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func NewTaskStatsServiceV2() (svc2 *ServiceV2, err error) {
|
||||
// service
|
||||
svc := &ServiceV2{
|
||||
|
||||
Reference in New Issue
Block a user