mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-23 17:31:11 +01:00
358 lines
9.8 KiB
Go
358 lines
9.8 KiB
Go
package mgo
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/globalsign/mgo/bson"
|
|
)
|
|
|
|
type FullDocument string
|
|
|
|
const (
|
|
Default = "default"
|
|
UpdateLookup = "updateLookup"
|
|
)
|
|
|
|
type ChangeStream struct {
|
|
iter *Iter
|
|
isClosed bool
|
|
options ChangeStreamOptions
|
|
pipeline interface{}
|
|
resumeToken *bson.Raw
|
|
collection *Collection
|
|
readPreference *ReadPreference
|
|
err error
|
|
m sync.Mutex
|
|
sessionCopied bool
|
|
}
|
|
|
|
type ChangeStreamOptions struct {
|
|
|
|
// FullDocument controls the amount of data that the server will return when
|
|
// returning a changes document.
|
|
FullDocument FullDocument
|
|
|
|
// ResumeAfter specifies the logical starting point for the new change stream.
|
|
ResumeAfter *bson.Raw
|
|
|
|
// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
|
|
// on new documents to satisfy a change stream query.
|
|
MaxAwaitTimeMS time.Duration
|
|
|
|
// BatchSize specifies the number of documents to return per batch.
|
|
BatchSize int
|
|
|
|
// Collation specifies the way the server should collate returned data.
|
|
//TODO Collation *Collation
|
|
}
|
|
|
|
var errMissingResumeToken = errors.New("resume token missing from result")
|
|
|
|
// Watch constructs a new ChangeStream capable of receiving continuing data
|
|
// from the database.
|
|
func (coll *Collection) Watch(pipeline interface{},
|
|
options ChangeStreamOptions) (*ChangeStream, error) {
|
|
|
|
if pipeline == nil {
|
|
pipeline = []bson.M{}
|
|
}
|
|
|
|
csPipe := constructChangeStreamPipeline(pipeline, options)
|
|
pipe := coll.Pipe(&csPipe)
|
|
if options.MaxAwaitTimeMS > 0 {
|
|
pipe.SetMaxTime(options.MaxAwaitTimeMS)
|
|
}
|
|
if options.BatchSize > 0 {
|
|
pipe.Batch(options.BatchSize)
|
|
}
|
|
pIter := pipe.Iter()
|
|
|
|
// check that there was no issue creating the iterator.
|
|
// this will fail immediately with an error from the server if running against
|
|
// a standalone.
|
|
if err := pIter.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pIter.isChangeStream = true
|
|
return &ChangeStream{
|
|
iter: pIter,
|
|
collection: coll,
|
|
resumeToken: nil,
|
|
options: options,
|
|
pipeline: pipeline,
|
|
}, nil
|
|
}
|
|
|
|
// Next retrieves the next document from the change stream, blocking if necessary.
|
|
// Next returns true if a document was successfully unmarshalled into result,
|
|
// and false if an error occured. When Next returns false, the Err method should
|
|
// be called to check what error occurred during iteration. If there were no events
|
|
// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton.
|
|
//
|
|
// For example:
|
|
//
|
|
// pipeline := []bson.M{}
|
|
//
|
|
// changeStream := collection.Watch(pipeline, ChangeStreamOptions{})
|
|
// for changeStream.Next(&changeDoc) {
|
|
// fmt.Printf("Change: %v\n", changeDoc)
|
|
// }
|
|
//
|
|
// if err := changeStream.Close(); err != nil {
|
|
// return err
|
|
// }
|
|
//
|
|
// If the pipeline used removes the _id field from the result, Next will error
|
|
// because the _id field is needed to resume iteration when an error occurs.
|
|
//
|
|
func (changeStream *ChangeStream) Next(result interface{}) bool {
|
|
// the err field is being constantly overwritten and we don't want the user to
|
|
// attempt to read it at this point so we lock.
|
|
changeStream.m.Lock()
|
|
|
|
defer changeStream.m.Unlock()
|
|
|
|
// if we are in a state of error, then don't continue.
|
|
if changeStream.err != nil {
|
|
return false
|
|
}
|
|
|
|
if changeStream.isClosed {
|
|
changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream")
|
|
return false
|
|
}
|
|
|
|
var err error
|
|
|
|
// attempt to fetch the change stream result.
|
|
err = changeStream.fetchResultSet(result)
|
|
if err == nil {
|
|
return true
|
|
}
|
|
|
|
// if we get no results we return false with no errors so the user can call Next
|
|
// again, resuming is not needed as the iterator is simply timed out as no events happened.
|
|
// The user will call Timeout in order to understand if this was the case.
|
|
if err == ErrNotFound {
|
|
return false
|
|
}
|
|
|
|
// check if the error is resumable
|
|
if !isResumableError(err) {
|
|
// error is not resumable, give up and return it to the user.
|
|
changeStream.err = err
|
|
return false
|
|
}
|
|
|
|
// try to resume.
|
|
err = changeStream.resume()
|
|
if err != nil {
|
|
// we've not been able to successfully resume and should only try once,
|
|
// so we give up.
|
|
changeStream.err = err
|
|
return false
|
|
}
|
|
|
|
// we've successfully resumed the changestream.
|
|
// try to fetch the next result.
|
|
err = changeStream.fetchResultSet(result)
|
|
if err != nil {
|
|
changeStream.err = err
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// Err returns nil if no errors happened during iteration, or the actual
|
|
// error otherwise.
|
|
func (changeStream *ChangeStream) Err() error {
|
|
changeStream.m.Lock()
|
|
defer changeStream.m.Unlock()
|
|
return changeStream.err
|
|
}
|
|
|
|
// Close kills the server cursor used by the iterator, if any, and returns
|
|
// nil if no errors happened during iteration, or the actual error otherwise.
|
|
func (changeStream *ChangeStream) Close() error {
|
|
changeStream.m.Lock()
|
|
defer changeStream.m.Unlock()
|
|
changeStream.isClosed = true
|
|
err := changeStream.iter.Close()
|
|
if err != nil {
|
|
changeStream.err = err
|
|
}
|
|
if changeStream.sessionCopied {
|
|
changeStream.iter.session.Close()
|
|
changeStream.sessionCopied = false
|
|
}
|
|
return err
|
|
}
|
|
|
|
// ResumeToken returns a copy of the current resume token held by the change stream.
|
|
// This token should be treated as an opaque token that can be provided to instantiate
|
|
// a new change stream.
|
|
func (changeStream *ChangeStream) ResumeToken() *bson.Raw {
|
|
changeStream.m.Lock()
|
|
defer changeStream.m.Unlock()
|
|
if changeStream.resumeToken == nil {
|
|
return nil
|
|
}
|
|
var tokenCopy = *changeStream.resumeToken
|
|
return &tokenCopy
|
|
}
|
|
|
|
// Timeout returns true if the last call of Next returned false because of an iterator timeout.
|
|
func (changeStream *ChangeStream) Timeout() bool {
|
|
return changeStream.iter.Timeout()
|
|
}
|
|
|
|
func constructChangeStreamPipeline(pipeline interface{},
|
|
options ChangeStreamOptions) interface{} {
|
|
pipelinev := reflect.ValueOf(pipeline)
|
|
|
|
// ensure that the pipeline passed in is a slice.
|
|
if pipelinev.Kind() != reflect.Slice {
|
|
panic("pipeline argument must be a slice")
|
|
}
|
|
|
|
// construct the options to be used by the change notification
|
|
// pipeline stage.
|
|
changeStreamStageOptions := bson.M{}
|
|
|
|
if options.FullDocument != "" {
|
|
changeStreamStageOptions["fullDocument"] = options.FullDocument
|
|
}
|
|
if options.ResumeAfter != nil {
|
|
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
|
|
}
|
|
|
|
changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions}
|
|
|
|
pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)
|
|
|
|
// insert the change notification pipeline stage at the beginning of the
|
|
// aggregation.
|
|
pipeOfInterfaces[0] = changeStreamStage
|
|
|
|
// convert the passed in slice to a slice of interfaces.
|
|
for i := 0; i < pipelinev.Len(); i++ {
|
|
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
|
|
}
|
|
var pipelineAsInterface interface{} = pipeOfInterfaces
|
|
return pipelineAsInterface
|
|
}
|
|
|
|
func (changeStream *ChangeStream) resume() error {
|
|
// copy the information for the new socket.
|
|
|
|
// Thanks to Copy() future uses will acquire a new socket against the newly selected DB.
|
|
newSession := changeStream.iter.session.Copy()
|
|
|
|
// fetch the cursor from the iterator and use it to run a killCursors
|
|
// on the connection.
|
|
cursorId := changeStream.iter.op.cursorId
|
|
err := runKillCursorsOnSession(newSession, cursorId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// change out the old connection to the database with the new connection.
|
|
if changeStream.sessionCopied {
|
|
changeStream.collection.Database.Session.Close()
|
|
}
|
|
changeStream.collection.Database.Session = newSession
|
|
changeStream.sessionCopied = true
|
|
|
|
opts := changeStream.options
|
|
if changeStream.resumeToken != nil {
|
|
opts.ResumeAfter = changeStream.resumeToken
|
|
}
|
|
// make a new pipeline containing the resume token.
|
|
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts)
|
|
|
|
// generate the new iterator with the new connection.
|
|
newPipe := changeStream.collection.Pipe(changeStreamPipeline)
|
|
changeStream.iter = newPipe.Iter()
|
|
if err := changeStream.iter.Err(); err != nil {
|
|
return err
|
|
}
|
|
changeStream.iter.isChangeStream = true
|
|
return nil
|
|
}
|
|
|
|
// fetchResumeToken unmarshals the _id field from the document, setting an error
|
|
// on the changeStream if it is unable to.
|
|
func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error {
|
|
changeStreamResult := struct {
|
|
ResumeToken *bson.Raw `bson:"_id,omitempty"`
|
|
}{}
|
|
|
|
err := rawResult.Unmarshal(&changeStreamResult)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if changeStreamResult.ResumeToken == nil {
|
|
return errMissingResumeToken
|
|
}
|
|
|
|
changeStream.resumeToken = changeStreamResult.ResumeToken
|
|
return nil
|
|
}
|
|
|
|
func (changeStream *ChangeStream) fetchResultSet(result interface{}) error {
|
|
rawResult := bson.Raw{}
|
|
|
|
// fetch the next set of documents from the cursor.
|
|
gotNext := changeStream.iter.Next(&rawResult)
|
|
err := changeStream.iter.Err()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !gotNext && err == nil {
|
|
// If the iter.Err() method returns nil despite us not getting a next batch,
|
|
// it is becuase iter.Err() silences this case.
|
|
return ErrNotFound
|
|
}
|
|
|
|
// grab the resumeToken from the results
|
|
if err := changeStream.fetchResumeToken(&rawResult); err != nil {
|
|
return err
|
|
}
|
|
|
|
// put the raw results into the data structure the user provided.
|
|
if err := rawResult.Unmarshal(result); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func isResumableError(err error) bool {
|
|
_, isQueryError := err.(*QueryError)
|
|
// if it is not a database error OR it is a database error,
|
|
// but the error is a notMaster error
|
|
//and is not a missingResumeToken error (caused by the user provided pipeline)
|
|
return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken)
|
|
}
|
|
|
|
func runKillCursorsOnSession(session *Session, cursorId int64) error {
|
|
socket, err := session.acquireSocket(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
socket.Release()
|
|
|
|
return nil
|
|
}
|