Files
crawlab/fs/seaweedfs_manager.go
2024-06-14 16:37:48 +08:00

723 lines
19 KiB
Go

package fs
import (
"bytes"
"errors"
"fmt"
"github.com/cenkalti/backoff/v4"
"github.com/crawlab-team/crawlab/trace"
"github.com/crawlab-team/goseaweedfs"
"github.com/emirpasic/gods/queues/linkedlistqueue"
"github.com/google/uuid"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
)
type seaweedFsManagerFn func(params seaweedFsManagerParams) (res seaweedFsManagerResults)
type seaweedFsManagerHandle struct {
params seaweedFsManagerParams
fn seaweedFsManagerFn
resChan chan seaweedFsManagerResults
}
type seaweedFsManagerParams struct {
localPath string
remotePath string
isRecursive bool
collection string
ttl string
urlValues url.Values
data []byte
}
type seaweedFsManagerResults struct {
files []goseaweedfs.FilerFileInfo
file *goseaweedfs.FilerFileInfo
data []byte
ok bool
err error
}
type SeaweedFsManager struct {
// settings variables
filerUrl string
timeout time.Duration
authKey string
workerNum int
retryNum uint64
retryInterval time.Duration
maxQps int
// internals
f *goseaweedfs.Filer
q *linkedlistqueue.Queue
cr int
ch chan seaweedFsManagerHandle
closed bool
}
func (m *SeaweedFsManager) Init() (err error) {
// filer options
var filerOpts []goseaweedfs.FilerOption
// auth key
if m.authKey != "" {
filerOpts = append(filerOpts, goseaweedfs.WithFilerAuthKey(m.authKey))
}
// handle channel
m.ch = make(chan seaweedFsManagerHandle, m.workerNum)
// filer instance
m.f, err = goseaweedfs.NewFiler(m.filerUrl, &http.Client{Timeout: m.timeout}, filerOpts...)
if err != nil {
return trace.TraceError(err)
}
// start async
go m.start()
return nil
}
func (m *SeaweedFsManager) Close() (err error) {
m.closed = true
if err := m.f.Close(); err != nil {
return trace.TraceError(err)
}
return nil
}
func (m *SeaweedFsManager) ListDir(remotePath string, isRecursive bool) (files []goseaweedfs.FilerFileInfo, err error) {
params := seaweedFsManagerParams{
remotePath: remotePath,
isRecursive: isRecursive,
}
res := m.process(params, m.listDir)
return res.files, res.err
}
func (m *SeaweedFsManager) ListDirRecursive(remotePath string) (files []goseaweedfs.FilerFileInfo, err error) {
params := seaweedFsManagerParams{
remotePath: remotePath,
}
res := m.process(params, m.listDirRecursive)
return res.files, res.err
}
func (m *SeaweedFsManager) UploadFile(localPath, remotePath string, args ...interface{}) (err error) {
localPath, err = filepath.Abs(localPath)
if err != nil {
return trace.TraceError(err)
}
collection, ttl := getCollectionAndTtlFromArgs(args...)
params := seaweedFsManagerParams{
localPath: localPath,
remotePath: remotePath,
collection: collection,
ttl: ttl,
}
res := m.process(params, m.uploadFile)
return res.err
}
func (m *SeaweedFsManager) UploadDir(localPath, remotePath string, args ...interface{}) (err error) {
localPath, err = filepath.Abs(localPath)
if err != nil {
return trace.TraceError(err)
}
collection, ttl := getCollectionAndTtlFromArgs(args...)
params := seaweedFsManagerParams{
localPath: localPath,
remotePath: remotePath,
collection: collection,
ttl: ttl,
}
res := m.process(params, m.uploadDir)
return res.err
}
func (m *SeaweedFsManager) DownloadFile(remotePath, localPath string, args ...interface{}) (err error) {
localPath, err = filepath.Abs(localPath)
if err != nil {
return trace.TraceError(err)
}
collection, ttl := getCollectionAndTtlFromArgs(args...)
urlValues := getUrlValuesFromArgs(args...)
params := seaweedFsManagerParams{
localPath: localPath,
remotePath: remotePath,
collection: collection,
ttl: ttl,
urlValues: urlValues,
}
res := m.process(params, m.downloadFile)
return res.err
}
func (m *SeaweedFsManager) DownloadDir(remotePath, localPath string, args ...interface{}) (err error) {
localPath, err = filepath.Abs(localPath)
if err != nil {
return trace.TraceError(err)
}
params := seaweedFsManagerParams{
remotePath: remotePath,
}
res := m.process(params, m.downloadDir)
return res.err
}
func (m *SeaweedFsManager) DeleteFile(remotePath string) (err error) {
params := seaweedFsManagerParams{
remotePath: remotePath,
}
res := m.process(params, m.deleteFile)
return res.err
}
func (m *SeaweedFsManager) DeleteDir(remotePath string) (err error) {
params := seaweedFsManagerParams{
remotePath: remotePath,
}
res := m.process(params, m.deleteDir)
return res.err
}
func (m *SeaweedFsManager) SyncLocalToRemote(localPath, remotePath string, args ...interface{}) (err error) {
localPath, err = filepath.Abs(localPath)
if err != nil {
return trace.TraceError(err)
}
collection, ttl := getCollectionAndTtlFromArgs(args...)
params := seaweedFsManagerParams{
localPath: localPath,
remotePath: remotePath,
collection: collection,
ttl: ttl,
}
res := m.process(params, m.syncLocalToRemote)
return res.err
}
func (m *SeaweedFsManager) SyncRemoteToLocal(remotePath, localPath string, args ...interface{}) (err error) {
collection, ttl := getCollectionAndTtlFromArgs(args...)
params := seaweedFsManagerParams{
localPath: localPath,
remotePath: remotePath,
collection: collection,
ttl: ttl,
}
res := m.process(params, m.syncRemoteToLocal)
return res.err
}
func (m *SeaweedFsManager) GetFile(remotePath string, args ...interface{}) (data []byte, err error) {
urlValues := getUrlValuesFromArgs(args...)
params := seaweedFsManagerParams{
remotePath: remotePath,
urlValues: urlValues,
}
res := m.process(params, m.getFile)
return res.data, res.err
}
func (m *SeaweedFsManager) GetFileInfo(remotePath string) (file *goseaweedfs.FilerFileInfo, err error) {
params := seaweedFsManagerParams{
remotePath: remotePath,
}
res := m.process(params, m.getFileInfo)
return res.file, res.err
}
func (m *SeaweedFsManager) UpdateFile(remotePath string, data []byte, args ...interface{}) (err error) {
collection, ttl := getCollectionAndTtlFromArgs(args...)
params := seaweedFsManagerParams{
remotePath: remotePath,
collection: collection,
ttl: ttl,
data: data,
}
res := m.process(params, m.updateFile)
return res.err
}
func (m *SeaweedFsManager) Exists(remotePath string, args ...interface{}) (ok bool, err error) {
_, err = m.GetFile(remotePath, args...)
if err == nil {
// exists
return true, nil
}
if strings.Contains(err.Error(), FilerStatusNotFoundErrorMessage) {
// not exists
return false, nil
}
return ok, trace.TraceError(err)
}
func (m *SeaweedFsManager) SetFilerUrl(url string) {
m.filerUrl = url
}
func (m *SeaweedFsManager) SetFilerAuthKey(authKey string) {
m.authKey = authKey
}
func (m *SeaweedFsManager) SetTimeout(timeout time.Duration) {
m.timeout = timeout
}
func (m *SeaweedFsManager) SetWorkerNum(num int) {
m.workerNum = num
}
func (m *SeaweedFsManager) SetRetryInterval(interval time.Duration) {
m.retryInterval = interval
}
func (m *SeaweedFsManager) SetRetryNum(num int) {
m.retryNum = uint64(num)
}
func (m *SeaweedFsManager) SetMaxQps(qps int) {
m.maxQps = qps
}
func (m *SeaweedFsManager) newHandle(params seaweedFsManagerParams, fn seaweedFsManagerFn) (handle seaweedFsManagerHandle) {
return seaweedFsManagerHandle{
params: params,
fn: fn,
resChan: make(chan seaweedFsManagerResults),
}
}
func (m *SeaweedFsManager) start() {
for {
if m.closed {
return
}
handle := <-m.ch
go func() {
if err := backoff.Retry(func() error {
res := handle.fn(handle.params)
if res.err != nil {
return res.err
}
handle.resChan <- res
return nil
}, backoff.WithMaxRetries(
backoff.NewConstantBackOff(m.retryInterval), m.retryNum),
); err != nil {
handle.resChan <- m.error(err)
}
m.wait()
}()
}
}
func (m *SeaweedFsManager) process(params seaweedFsManagerParams, fn seaweedFsManagerFn) (res seaweedFsManagerResults) {
handle := m.newHandle(params, fn)
//log.Infof("handle: %v", handle)
m.ch <- handle
res = <-handle.resChan
return
}
func (m *SeaweedFsManager) error(err error) (res seaweedFsManagerResults) {
if err != nil {
trace.PrintError(err)
}
return seaweedFsManagerResults{err: err}
}
func (m *SeaweedFsManager) wait() {
ms := float32(1) / float32(m.maxQps) * 1e3
d := time.Duration(ms) * time.Millisecond
time.Sleep(d)
}
func (m *SeaweedFsManager) getCollectionAndTtlArgsFromParams(params seaweedFsManagerParams) (args []interface{}) {
if params.collection != "" {
args = append(args, params.collection)
}
if params.ttl != "" {
args = append(args, params.ttl)
}
return args
}
func (m *SeaweedFsManager) listDir(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
var err error
if params.isRecursive {
res.files, err = m.ListDirRecursive(params.remotePath)
} else {
res.files, err = m.f.ListDir(params.remotePath)
}
if err != nil {
return m.error(err)
}
return
}
func (m *SeaweedFsManager) listDirRecursive(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
entries, err := m.f.ListDir(params.remotePath)
if err != nil {
return m.error(err)
}
for _, file := range entries {
file = goseaweedfs.GetFileWithExtendedFields(file)
if file.IsDir {
file.Children, err = m.ListDirRecursive(file.FullPath)
if err != nil {
return m.error(err)
}
}
res.files = append(res.files, file)
}
return
}
func (m *SeaweedFsManager) uploadFile(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
r, err := m.f.UploadFile(params.localPath, params.remotePath, params.collection, params.ttl)
if err != nil {
return m.error(err)
}
if r.Error != "" {
err = errors.New(r.Error)
return m.error(err)
}
return
}
func (m *SeaweedFsManager) uploadDir(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
args := m.getCollectionAndTtlArgsFromParams(params)
if strings.HasSuffix(params.localPath, "/") {
params.localPath = params.localPath[:(len(params.localPath) - 1)]
}
if !strings.HasPrefix(params.remotePath, "/") {
params.remotePath = "/" + params.remotePath
}
files, err := goseaweedfs.ListFilesRecursive(params.localPath)
if err != nil {
return m.error(err)
}
for _, info := range files {
newFilePath := params.remotePath + strings.Replace(info.Path, params.localPath, "", -1)
if err := m.UploadFile(info.Path, newFilePath, args...); err != nil {
return m.error(err)
}
}
return
}
func (m *SeaweedFsManager) downloadFile(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
err := m.f.Download(params.remotePath, params.urlValues, func(reader io.Reader) error {
data, err := ioutil.ReadAll(reader)
if err != nil {
return trace.TraceError(err)
}
dirPath := filepath.Dir(params.localPath)
_, err = os.Stat(dirPath)
if err != nil {
// if not exists, create a new directory
if err := os.MkdirAll(dirPath, DefaultDirMode); err != nil {
return trace.TraceError(err)
}
}
fileMode := DefaultFileMode
fileInfo, err := os.Stat(params.localPath)
if err == nil {
// if file already exists, save file mode and remove it
fileMode = fileInfo.Mode()
if err := os.Remove(params.localPath); err != nil {
return trace.TraceError(err)
}
}
if err := ioutil.WriteFile(params.localPath, data, fileMode); err != nil {
return trace.TraceError(err)
}
return nil
})
if err != nil {
return m.error(err)
}
return
}
func (m *SeaweedFsManager) downloadDir(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
args := m.getCollectionAndTtlArgsFromParams(params)
var files []goseaweedfs.FilerFileInfo
files, res.err = m.ListDir(params.remotePath, true)
for _, file := range files {
if file.IsDir {
if err := m.DownloadDir(file.FullPath, path.Join(params.localPath, file.Name), args...); err != nil {
return m.error(err)
}
} else {
if err := m.DownloadFile(file.FullPath, path.Join(params.localPath, file.Name), args...); err != nil {
return m.error(err)
}
}
}
return
}
func (m *SeaweedFsManager) deleteFile(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
if err := m.f.DeleteFile(params.remotePath); err != nil {
return m.error(err)
}
return
}
func (m *SeaweedFsManager) deleteDir(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
if err := m.f.DeleteDir(params.remotePath); err != nil {
return m.error(err)
}
return
}
func (m *SeaweedFsManager) syncLocalToRemote(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
// args
args := m.getCollectionAndTtlArgsFromParams(params)
// raise error if local path does not exist
if _, err := os.Stat(params.localPath); err != nil {
return m.error(err)
}
// get files and maps
localFiles, remoteFiles, localFilesMap, remoteFilesMap, err := getFilesAndFilesMaps(m.f, params.localPath, params.remotePath)
if err != nil {
return m.error(err)
}
// compare remote files with local files and delete files absent in local files
for _, remoteFile := range remoteFiles {
// attempt to get corresponding local file
_, ok := localFilesMap[remoteFile.FullPath]
if !ok {
// file does not exist on local, delete
if remoteFile.IsDir {
if err := m.DeleteDir(remoteFile.FullPath); err != nil {
return m.error(err)
}
} else {
if err := m.DeleteFile(remoteFile.FullPath); err != nil {
return m.error(err)
}
}
}
}
// compare local files with remote files and upload files with difference
for _, localFile := range localFiles {
// skip .git
if IsGitFile(localFile) {
continue
}
// corresponding remote file path
fileRemotePath := fmt.Sprintf("%s%s", params.remotePath, strings.Replace(localFile.Path, params.localPath, "", -1))
// attempt to get corresponding remote file
remoteFile, ok := remoteFilesMap[fileRemotePath]
if !ok {
// file does not exist on remote, upload
if err := m.UploadFile(localFile.Path, fileRemotePath, args...); err != nil {
return m.error(err)
}
} else {
// file exists on remote, upload if md5sum values are different
if remoteFile.Md5 != localFile.Md5 {
if err := m.UploadFile(localFile.Path, fileRemotePath, args...); err != nil {
return m.error(err)
}
}
}
}
return
}
func (m *SeaweedFsManager) syncRemoteToLocal(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
// args
args := m.getCollectionAndTtlArgsFromParams(params)
// create directory if local path does not exist
if _, err := os.Stat(params.localPath); err != nil {
if err := os.MkdirAll(params.localPath, os.ModePerm); err != nil {
return m.error(err)
}
}
// get files and maps
localFiles, remoteFiles, localFilesMap, remoteFilesMap, err := getFilesAndFilesMaps(m.f, params.localPath, params.remotePath)
if err != nil {
return m.error(err)
}
// compare local files with remote files and delete files absent on remote
for _, localFile := range localFiles {
// skip .git
if IsGitFile(localFile) {
continue
}
// corresponding remote file path
fileRemotePath := fmt.Sprintf("%s%s", params.remotePath, strings.Replace(localFile.Path, params.localPath, "", -1))
// attempt to get corresponding remote file
_, ok := remoteFilesMap[fileRemotePath]
if !ok {
// file does not exist on remote, upload
if err := os.Remove(localFile.Path); err != nil {
return m.error(err)
}
}
}
// compare remote files with local files and download if files with difference
for _, remoteFile := range remoteFiles {
// directory
if remoteFile.IsDir {
localDirRelativePath := strings.Replace(remoteFile.FullPath, params.remotePath, "", 1)
localDirPath := fmt.Sprintf("%s%s", params.localPath, localDirRelativePath)
if err := m.SyncRemoteToLocal(remoteFile.FullPath, localDirPath); err != nil {
return m.error(err)
}
continue
}
// local file path
localFileRelativePath := strings.Replace(remoteFile.FullPath, params.remotePath, "", 1)
localFilePath := fmt.Sprintf("%s%s", params.localPath, localFileRelativePath)
// attempt to get corresponding local file
localFile, ok := localFilesMap[remoteFile.FullPath]
if !ok {
// file does not exist on local, download
if err := m.DownloadFile(remoteFile.FullPath, localFilePath); err != nil {
return m.error(err)
}
} else {
// file exists on remote, download if md5sum values are different
if remoteFile.Md5 != localFile.Md5 {
if err := m.DownloadFile(remoteFile.FullPath, localFilePath, args...); err != nil {
return m.error(err)
}
}
}
}
return
}
func (m *SeaweedFsManager) getFile(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
var buf bytes.Buffer
res.err = m.f.Download(params.remotePath, params.urlValues, func(reader io.Reader) error {
_, err := io.Copy(&buf, reader)
if err != nil {
return trace.TraceError(err)
}
return nil
})
res.data = buf.Bytes()
return
}
func (m *SeaweedFsManager) getFileInfo(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
arr := strings.Split(params.remotePath, "/")
dirName := strings.Join(arr[:(len(arr)-1)], "/")
files, err := m.f.ListDir(dirName)
if err != nil {
return m.error(err)
}
for _, f := range files {
if f.FullPath == params.remotePath {
res.file = &f
return
}
}
return m.error(ErrorFsNotExists)
}
func (m *SeaweedFsManager) updateFile(params seaweedFsManagerParams) (res seaweedFsManagerResults) {
tmpRootDir := os.TempDir()
tmpDirPath := path.Join(tmpRootDir, ".seaweedfs")
if _, err := os.Stat(tmpDirPath); err != nil {
if err := os.MkdirAll(tmpDirPath, os.ModePerm); err != nil {
return m.error(err)
}
}
tmpFilePath := path.Join(tmpDirPath, fmt.Sprintf(".%s", uuid.New().String()))
if _, err := os.Stat(tmpFilePath); err == nil {
if err := os.Remove(tmpFilePath); err != nil {
return m.error(err)
}
}
if err := ioutil.WriteFile(tmpFilePath, params.data, os.ModePerm); err != nil {
return m.error(err)
}
params2 := seaweedFsManagerParams{
localPath: tmpFilePath,
remotePath: params.remotePath,
collection: params.collection,
ttl: params.ttl,
}
if res := m.uploadFile(params2); res.err != nil {
return m.error(res.err)
}
if err := os.Remove(tmpFilePath); err != nil {
return m.error(err)
}
return
}
func NewSeaweedFsManager(opts ...Option) (m2 Manager, err error) {
// manager
m := &SeaweedFsManager{
filerUrl: "http://localhost:8888",
timeout: 5 * time.Minute,
workerNum: 1,
retryInterval: 500 * time.Millisecond,
retryNum: 3,
maxQps: 5,
q: linkedlistqueue.New(),
}
// apply options
for _, opt := range opts {
opt(m)
}
// initialize
if err := m.Init(); err != nil {
return nil, err
}
return m, nil
}
var _seaweedFsManager Manager
func GetSeaweedFsManager(opts ...Option) (m2 Manager, err error) {
if _seaweedFsManager == nil {
_seaweedFsManager, err = NewSeaweedFsManager(opts...)
if err != nil {
return nil, err
}
}
return _seaweedFsManager, nil
}