fix: missing data source issue

This commit is contained in:
Marvin Zhang
2024-06-26 12:37:24 +08:00
parent 5daeccb87d
commit 326a8d67d0
18 changed files with 843 additions and 45 deletions

View File

@@ -58,3 +58,48 @@ func getCockroachdbSession(ctx context.Context, ds *models.DataSource) (s db.Ses
return s, err
}
func GetCockroachdbSessionWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (s db.Session, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getCockroachdbSessionV2(ctx, ds)
}
func getCockroachdbSessionV2(ctx context.Context, ds *models.DataSourceV2) (s db.Session, err error) {
// normalize settings
host := ds.Host
port := ds.Port
if ds.Host == "" {
host = constants.DefaultHost
}
if ds.Port == "" {
port = constants.DefaultCockroachdbPort
}
// connect settings
settings := mssql.ConnectionURL{
User: ds.Username,
Password: ds.Password,
Database: ds.Database,
Host: fmt.Sprintf("%s:%s", host, port),
Options: nil,
}
// session
done := make(chan struct{})
go func() {
s, err = mssql.Open(settings)
close(done)
}()
// wait for done
select {
case <-ctx.Done():
if ctx.Err() != nil {
err = ctx.Err()
}
case <-done:
}
return s, err
}

View File

@@ -50,34 +50,75 @@ func getElasticsearchClient(ctx context.Context, ds *models.DataSource) (c *elas
Addresses: addresses,
Username: ds.Username,
Password: ds.Password,
//CloudID: "",
//APIKey: "",
//ServiceToken: "",
//CertificateFingerprint: "",
//Header: nil,
//CACert: nil,
//RetryOnStatus: nil,
//DisableRetry: false,
//EnableRetryOnTimeout: false,
//MaxRetries: 0,
//CompressRequestBody: false,
//DiscoverNodesOnStart: false,
//DiscoverNodesInterval: 0,
//EnableMetrics: false,
//EnableDebugLogger: false,
//EnableCompatibilityMode: false,
//DisableMetaHeader: false,
//UseResponseCheckOnly: false,
RetryBackoff: func(i int) time.Duration {
if i == 1 {
rb.Reset()
}
return rb.NextBackOff()
},
//Transport: nil,
//Logger: nil,
//Selector: nil,
//ConnectionPoolFunc: nil,
}
// es client
done := make(chan struct{})
go func() {
c, err = elasticsearch.NewClient(cfg)
if err != nil {
return
}
var res *esapi.Response
res, err = c.Info()
fmt.Println(res)
close(done)
}()
// wait for done
select {
case <-ctx.Done():
if ctx.Err() != nil {
err = ctx.Err()
}
case <-done:
}
return c, err
}
func GetElasticsearchClientWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (c *elasticsearch.Client, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getElasticsearchClientV2(ctx, ds)
}
func getElasticsearchClientV2(ctx context.Context, ds *models.DataSourceV2) (c *elasticsearch.Client, err error) {
// normalize settings
host := ds.Host
port := ds.Port
if ds.Host == "" {
host = constants.DefaultHost
}
if ds.Port == "" {
port = constants.DefaultElasticsearchPort
}
// es hosts
addresses := []string{
fmt.Sprintf("http://%s:%s", host, port),
}
// retry backoff
rb := backoff.NewExponentialBackOff()
// es client options
cfg := elasticsearch.Config{
Addresses: addresses,
Username: ds.Username,
Password: ds.Password,
RetryBackoff: func(i int) time.Duration {
if i == 1 {
rb.Reset()
}
return rb.NextBackOff()
},
}
// es client

View File

@@ -39,3 +39,30 @@ func getKafkaConnection(ctx context.Context, ds *models.DataSource) (c *kafka.Co
// kafka connection
return kafka.DialLeader(ctx, network, address, topic, partition)
}
func GetKafkaConnectionWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (c *kafka.Conn, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getKafkaConnectionV2(ctx, ds)
}
func getKafkaConnectionV2(ctx context.Context, ds *models.DataSourceV2) (c *kafka.Conn, err error) {
// normalize settings
host := ds.Host
port := ds.Port
if ds.Host == "" {
host = constants.DefaultHost
}
if ds.Port == "" {
port = constants.DefaultKafkaPort
}
// kafka connection address
network := "tcp"
address := fmt.Sprintf("%s:%s", host, port)
topic := ds.Database
partition := 0 // TODO: parameterize
// kafka connection
return kafka.DialLeader(ctx, network, address, topic, partition)
}

View File

@@ -54,6 +54,12 @@ func GetMongoClientWithTimeout(ds *models.DataSource, timeout time.Duration) (c
return getMongoClient(ctx, ds)
}
func GetMongoClientWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (c *mongo2.Client, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getMongoClientV2(ctx, ds)
}
func getMongoClient(ctx context.Context, ds *models.DataSource) (c *mongo2.Client, err error) {
// normalize settings
if ds.Host == "" {
@@ -92,3 +98,42 @@ func getMongoClient(ctx context.Context, ds *models.DataSource) (c *mongo2.Clien
// client
return mongo.GetMongoClient(opts...)
}
func getMongoClientV2(ctx context.Context, ds *models.DataSourceV2) (c *mongo2.Client, err error) {
// normalize settings
if ds.Host == "" {
ds.Host = constants.DefaultHost
}
if ds.Port == "" {
ds.Port = constants.DefaultMongoPort
}
// options
var opts []mongo.ClientOption
opts = append(opts, mongo.WithContext(ctx))
opts = append(opts, mongo.WithUri(ds.Url))
opts = append(opts, mongo.WithHost(ds.Host))
opts = append(opts, mongo.WithPort(ds.Port))
opts = append(opts, mongo.WithDb(ds.Database))
opts = append(opts, mongo.WithUsername(ds.Username))
opts = append(opts, mongo.WithPassword(ds.Password))
opts = append(opts, mongo.WithHosts(ds.Hosts))
// extra
if ds.Extra != nil {
// auth source
authSource, ok := ds.Extra["auth_source"]
if ok {
opts = append(opts, mongo.WithAuthSource(authSource))
}
// auth mechanism
authMechanism, ok := ds.Extra["auth_mechanism"]
if ok {
opts = append(opts, mongo.WithAuthMechanism(authMechanism))
}
}
// client
return mongo.GetMongoClient(opts...)
}

View File

@@ -58,3 +58,48 @@ func getMssqlSession(ctx context.Context, ds *models.DataSource) (s db.Session,
return s, err
}
func GetMssqlSessionWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (s db.Session, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getMssqlSessionV2(ctx, ds)
}
func getMssqlSessionV2(ctx context.Context, ds *models.DataSourceV2) (s db.Session, err error) {
// normalize settings
host := ds.Host
port := ds.Port
if ds.Host == "" {
host = constants.DefaultHost
}
if ds.Port == "" {
port = constants.DefaultMssqlPort
}
// connect settings
settings := mssql.ConnectionURL{
User: ds.Username,
Password: ds.Password,
Database: ds.Database,
Host: fmt.Sprintf("%s:%s", host, port),
Options: nil,
}
// session
done := make(chan struct{})
go func() {
s, err = mssql.Open(settings)
close(done)
}()
// wait for done
select {
case <-ctx.Done():
if ctx.Err() != nil {
err = ctx.Err()
}
case <-done:
}
return s, err
}

View File

@@ -58,3 +58,48 @@ func getMysqlSession(ctx context.Context, ds *models.DataSource) (s db.Session,
return s, err
}
func GetMysqlSessionWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (s db.Session, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getMysqlSessionV2(ctx, ds)
}
func getMysqlSessionV2(ctx context.Context, ds *models.DataSourceV2) (s db.Session, err error) {
// normalize settings
host := ds.Host
port := ds.Port
if ds.Host == "" {
host = constants.DefaultHost
}
if ds.Port == "" {
port = constants.DefaultMysqlPort
}
// connect settings
settings := mysql.ConnectionURL{
User: ds.Username,
Password: ds.Password,
Database: ds.Database,
Host: fmt.Sprintf("%s:%s", host, port),
Options: nil,
}
// session
done := make(chan struct{})
go func() {
s, err = mysql.Open(settings)
close(done)
}()
// wait for done
select {
case <-ctx.Done():
if ctx.Err() != nil {
err = ctx.Err()
}
case <-done:
}
return s, err
}

View File

@@ -58,3 +58,48 @@ func getPostgresqlSession(ctx context.Context, ds *models.DataSource) (s db.Sess
return s, err
}
func GetPostgresqlSessionWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (s db.Session, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getPostgresqlSessionV2(ctx, ds)
}
func getPostgresqlSessionV2(ctx context.Context, ds *models.DataSourceV2) (s db.Session, err error) {
// normalize settings
host := ds.Host
port := ds.Port
if ds.Host == "" {
host = constants.DefaultHost
}
if ds.Port == "" {
port = constants.DefaultPostgresqlPort
}
// connect settings
settings := postgresql.ConnectionURL{
User: ds.Username,
Password: ds.Password,
Database: ds.Database,
Host: fmt.Sprintf("%s:%s", host, port),
Options: nil,
}
// session
done := make(chan struct{})
go func() {
s, err = postgresql.Open(settings)
close(done)
}()
// wait for done
select {
case <-ctx.Done():
if ctx.Err() != nil {
err = ctx.Err()
}
case <-done:
}
return s, err
}

View File

@@ -43,3 +43,35 @@ func getSqliteSession(ctx context.Context, ds *models.DataSource) (s db.Session,
return s, err
}
func GetSqliteSessionWithTimeoutV2(ds *models.DataSourceV2, timeout time.Duration) (s db.Session, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return getSqliteSessionV2(ctx, ds)
}
func getSqliteSessionV2(ctx context.Context, ds *models.DataSourceV2) (s db.Session, err error) {
// connect settings
settings := sqlite.ConnectionURL{
Database: ds.Database,
Options: nil,
}
// session
done := make(chan struct{})
go func() {
s, err = sqlite.Open(settings)
close(done)
}()
// wait for done
select {
case <-ctx.Done():
if ctx.Err() != nil {
err = ctx.Err()
}
case <-done:
}
return s, err
}