mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-23 17:31:11 +01:00
610 lines
18 KiB
Go
610 lines
18 KiB
Go
// mgo - MongoDB driver for Go
|
|
//
|
|
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
|
|
//
|
|
// All rights reserved.
|
|
//
|
|
// Redistribution and use in source and binary forms, with or without
|
|
// modification, are permitted provided that the following conditions are met:
|
|
//
|
|
// 1. Redistributions of source code must retain the above copyright notice, this
|
|
// list of conditions and the following disclaimer.
|
|
// 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
// this list of conditions and the following disclaimer in the documentation
|
|
// and/or other materials provided with the distribution.
|
|
//
|
|
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
|
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
|
|
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
package mgo
|
|
|
|
import (
|
|
"errors"
|
|
"net"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/globalsign/mgo/bson"
|
|
)
|
|
|
|
// coarseTime is used to amortise the cost of querying the timecounter (possibly
|
|
// incurring a syscall too) when setting a socket.lastTimeUsed value which
|
|
// happens frequently in the hot-path.
|
|
//
|
|
// The lastTimeUsed value may be skewed by at least 25ms (see
|
|
// coarseTimeProvider).
|
|
var coarseTime *coarseTimeProvider
|
|
|
|
func init() {
|
|
coarseTime = newcoarseTimeProvider(25 * time.Millisecond)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Mongo server encapsulation.
|
|
|
|
type mongoServer struct {
|
|
sync.RWMutex
|
|
Addr string
|
|
ResolvedAddr string
|
|
tcpaddr *net.TCPAddr
|
|
unusedSockets []*mongoSocket
|
|
liveSockets []*mongoSocket
|
|
sync chan bool
|
|
dial dialer
|
|
pingValue time.Duration
|
|
pingIndex int
|
|
pingWindow [6]time.Duration
|
|
info *mongoServerInfo
|
|
pingCount uint32
|
|
closed bool
|
|
abended bool
|
|
poolWaiter *sync.Cond
|
|
dialInfo *DialInfo
|
|
}
|
|
|
|
type dialer struct {
|
|
old func(addr net.Addr) (net.Conn, error)
|
|
new func(addr *ServerAddr) (net.Conn, error)
|
|
}
|
|
|
|
func (dial dialer) isSet() bool {
|
|
return dial.old != nil || dial.new != nil
|
|
}
|
|
|
|
type mongoServerInfo struct {
|
|
Master bool
|
|
Mongos bool
|
|
Tags bson.D
|
|
MaxWireVersion int
|
|
SetName string
|
|
}
|
|
|
|
var defaultServerInfo mongoServerInfo
|
|
|
|
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, info *DialInfo) *mongoServer {
|
|
server := &mongoServer{
|
|
Addr: addr,
|
|
ResolvedAddr: tcpaddr.String(),
|
|
tcpaddr: tcpaddr,
|
|
sync: syncChan,
|
|
dial: dial,
|
|
info: &defaultServerInfo,
|
|
pingValue: time.Hour, // Push it back before an actual ping.
|
|
dialInfo: info,
|
|
}
|
|
server.poolWaiter = sync.NewCond(server)
|
|
go server.pinger(true)
|
|
if info.MaxIdleTimeMS != 0 {
|
|
go server.poolShrinker()
|
|
}
|
|
return server
|
|
}
|
|
|
|
var errPoolLimit = errors.New("per-server connection limit reached")
|
|
var errPoolTimeout = errors.New("could not acquire connection within pool timeout")
|
|
var errServerClosed = errors.New("server was closed")
|
|
|
|
// AcquireSocket returns a socket for communicating with the server.
|
|
// This will attempt to reuse an old connection, if one is available. Otherwise,
|
|
// it will establish a new one. The returned socket is owned by the call site,
|
|
// and will return to the cache when the socket has its Release method called
|
|
// the same number of times as AcquireSocket + Acquire were called for it.
|
|
// If the poolLimit argument is greater than zero and the number of sockets in
|
|
// use in this server is greater than the provided limit, errPoolLimit is
|
|
// returned.
|
|
func (server *mongoServer) AcquireSocket(info *DialInfo) (socket *mongoSocket, abended bool, err error) {
|
|
return server.acquireSocketInternal(info, false)
|
|
}
|
|
|
|
// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_
|
|
// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout
|
|
// should elapse before a socket is available, it will return errPoolTimeout.
|
|
func (server *mongoServer) AcquireSocketWithBlocking(info *DialInfo) (socket *mongoSocket, abended bool, err error) {
|
|
return server.acquireSocketInternal(info, true)
|
|
}
|
|
|
|
func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock bool) (socket *mongoSocket, abended bool, err error) {
|
|
for {
|
|
server.Lock()
|
|
abended = server.abended
|
|
if server.closed {
|
|
server.Unlock()
|
|
return nil, abended, errServerClosed
|
|
}
|
|
if info.PoolLimit > 0 {
|
|
if shouldBlock {
|
|
// Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout
|
|
// with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout,
|
|
// and fail if it is blown.
|
|
// Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition
|
|
// variable per waiter, which would involve loop traversal in the RecycleSocket
|
|
// method.
|
|
// We also can't use the approach of turning a condition variable into a channel outlined in
|
|
// https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine.
|
|
waitDone := make(chan struct{})
|
|
timeoutHit := false
|
|
if info.PoolTimeout > 0 {
|
|
go func() {
|
|
select {
|
|
case <-waitDone:
|
|
case <-time.After(info.PoolTimeout):
|
|
// timeoutHit is part of the wait condition, so needs to be changed under mutex.
|
|
server.Lock()
|
|
defer server.Unlock()
|
|
timeoutHit = true
|
|
server.poolWaiter.Broadcast()
|
|
}
|
|
}()
|
|
}
|
|
timeSpentWaiting := time.Duration(0)
|
|
for len(server.liveSockets)-len(server.unusedSockets) >= info.PoolLimit && !timeoutHit {
|
|
// We only count time spent in Wait(), and not time evaluating the entire loop,
|
|
// so that in the happy non-blocking path where the condition above evaluates true
|
|
// first time, we record a nice round zero wait time.
|
|
waitStart := time.Now()
|
|
// unlocks server mutex, waits, and locks again. Thus, the condition
|
|
// above is evaluated only when the lock is held.
|
|
server.poolWaiter.Wait()
|
|
timeSpentWaiting += time.Since(waitStart)
|
|
}
|
|
close(waitDone)
|
|
if timeoutHit {
|
|
server.Unlock()
|
|
stats.noticePoolTimeout(timeSpentWaiting)
|
|
return nil, false, errPoolTimeout
|
|
}
|
|
// Record that we fetched a connection of of a socket list and how long we spent waiting
|
|
stats.noticeSocketAcquisition(timeSpentWaiting)
|
|
} else {
|
|
if len(server.liveSockets)-len(server.unusedSockets) >= info.PoolLimit {
|
|
server.Unlock()
|
|
return nil, false, errPoolLimit
|
|
}
|
|
}
|
|
}
|
|
n := len(server.unusedSockets)
|
|
if n > 0 {
|
|
socket = server.unusedSockets[n-1]
|
|
server.unusedSockets[n-1] = nil // Help GC.
|
|
server.unusedSockets = server.unusedSockets[:n-1]
|
|
serverInfo := server.info
|
|
server.Unlock()
|
|
err = socket.InitialAcquire(serverInfo, info)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
} else {
|
|
server.Unlock()
|
|
socket, err = server.Connect(info)
|
|
if err == nil {
|
|
server.Lock()
|
|
// We've waited for the Connect, see if we got
|
|
// closed in the meantime
|
|
if server.closed {
|
|
server.Unlock()
|
|
socket.Release()
|
|
socket.Close()
|
|
return nil, abended, errServerClosed
|
|
}
|
|
server.liveSockets = append(server.liveSockets, socket)
|
|
server.Unlock()
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
// Connect establishes a new connection to the server. This should
|
|
// generally be done through server.AcquireSocket().
|
|
func (server *mongoServer) Connect(info *DialInfo) (*mongoSocket, error) {
|
|
server.RLock()
|
|
master := server.info.Master
|
|
dial := server.dial
|
|
server.RUnlock()
|
|
|
|
logf("Establishing new connection to %s (timeout=%s)...", server.Addr, info.Timeout)
|
|
var conn net.Conn
|
|
var err error
|
|
switch {
|
|
case !dial.isSet():
|
|
conn, err = net.DialTimeout("tcp", server.ResolvedAddr, info.Timeout)
|
|
if tcpconn, ok := conn.(*net.TCPConn); ok {
|
|
tcpconn.SetKeepAlive(true)
|
|
} else if err == nil {
|
|
panic("internal error: obtained TCP connection is not a *net.TCPConn!?")
|
|
}
|
|
case dial.old != nil:
|
|
conn, err = dial.old(server.tcpaddr)
|
|
case dial.new != nil:
|
|
conn, err = dial.new(&ServerAddr{server.Addr, server.tcpaddr})
|
|
default:
|
|
panic("dialer is set, but both dial.old and dial.new are nil")
|
|
}
|
|
if err != nil {
|
|
logf("Connection to %s failed: %v", server.Addr, err.Error())
|
|
return nil, err
|
|
}
|
|
logf("Connection to %s established.", server.Addr)
|
|
|
|
stats.conn(+1, master)
|
|
return newSocket(server, conn, info), nil
|
|
}
|
|
|
|
// Close forces closing all sockets that are alive, whether
|
|
// they're currently in use or not.
|
|
func (server *mongoServer) Close() {
|
|
server.close(false)
|
|
}
|
|
|
|
// CloseIdle closing all sockets that are idle,
|
|
// sockets currently in use will be closed after idle.
|
|
func (server *mongoServer) CloseIdle() {
|
|
server.close(true)
|
|
}
|
|
|
|
func (server *mongoServer) close(waitForIdle bool) {
|
|
server.Lock()
|
|
server.closed = true
|
|
liveSockets := server.liveSockets
|
|
unusedSockets := server.unusedSockets
|
|
server.liveSockets = nil
|
|
server.unusedSockets = nil
|
|
server.Unlock()
|
|
logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
|
|
for i, s := range liveSockets {
|
|
if waitForIdle {
|
|
s.CloseAfterIdle()
|
|
} else {
|
|
s.Close()
|
|
}
|
|
liveSockets[i] = nil
|
|
}
|
|
for i := range unusedSockets {
|
|
unusedSockets[i] = nil
|
|
}
|
|
}
|
|
|
|
// RecycleSocket puts socket back into the unused cache.
|
|
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
|
|
server.Lock()
|
|
if !server.closed {
|
|
socket.lastTimeUsed = coarseTime.Now() // A rough approximation of the current time - see courseTime
|
|
server.unusedSockets = append(server.unusedSockets, socket)
|
|
}
|
|
// If anybody is waiting for a connection, they should try now.
|
|
// Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket
|
|
// and AcquireSocketWithBlocking allow the caller to specify the max number of connections,
|
|
// rather than that being an intrinsic property of the connection pool (I assume to ensure
|
|
// that there is always a connection available for replset topology discovery). Thus, once
|
|
// a connection is returned to the pool, _every_ waiter needs to check if the connection count
|
|
// is underneath their particular value for poolLimit.
|
|
server.poolWaiter.Broadcast()
|
|
server.Unlock()
|
|
}
|
|
|
|
func removeSocket(sockets []*mongoSocket, socket *mongoSocket) []*mongoSocket {
|
|
for i, s := range sockets {
|
|
if s == socket {
|
|
copy(sockets[i:], sockets[i+1:])
|
|
n := len(sockets) - 1
|
|
sockets[n] = nil
|
|
sockets = sockets[:n]
|
|
break
|
|
}
|
|
}
|
|
return sockets
|
|
}
|
|
|
|
// AbendSocket notifies the server that the given socket has terminated
|
|
// abnormally, and thus should be discarded rather than cached.
|
|
func (server *mongoServer) AbendSocket(socket *mongoSocket) {
|
|
server.Lock()
|
|
server.abended = true
|
|
if server.closed {
|
|
server.Unlock()
|
|
return
|
|
}
|
|
server.liveSockets = removeSocket(server.liveSockets, socket)
|
|
server.unusedSockets = removeSocket(server.unusedSockets, socket)
|
|
server.Unlock()
|
|
// Maybe just a timeout, but suggest a cluster sync up just in case.
|
|
select {
|
|
case server.sync <- true:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (server *mongoServer) SetInfo(info *mongoServerInfo) {
|
|
server.Lock()
|
|
server.info = info
|
|
server.Unlock()
|
|
}
|
|
|
|
func (server *mongoServer) Info() *mongoServerInfo {
|
|
server.Lock()
|
|
info := server.info
|
|
server.Unlock()
|
|
return info
|
|
}
|
|
|
|
func (server *mongoServer) hasTags(serverTags []bson.D) bool {
|
|
NextTagSet:
|
|
for _, tags := range serverTags {
|
|
NextReqTag:
|
|
for _, req := range tags {
|
|
for _, has := range server.info.Tags {
|
|
if req.Name == has.Name {
|
|
if req.Value == has.Value {
|
|
continue NextReqTag
|
|
}
|
|
continue NextTagSet
|
|
}
|
|
}
|
|
continue NextTagSet
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
var pingDelay = 15 * time.Second
|
|
|
|
func (server *mongoServer) pinger(loop bool) {
|
|
var delay time.Duration
|
|
if raceDetector {
|
|
// This variable is only ever touched by tests.
|
|
globalMutex.Lock()
|
|
delay = pingDelay
|
|
globalMutex.Unlock()
|
|
} else {
|
|
delay = pingDelay
|
|
}
|
|
op := queryOp{
|
|
collection: "admin.$cmd",
|
|
query: bson.D{{Name: "ping", Value: 1}},
|
|
flags: flagSlaveOk,
|
|
limit: -1,
|
|
}
|
|
for {
|
|
if loop {
|
|
time.Sleep(delay)
|
|
}
|
|
op := op
|
|
|
|
socket, _, err := server.AcquireSocket(server.dialInfo)
|
|
if err == nil {
|
|
start := time.Now()
|
|
_, _ = socket.SimpleQuery(&op)
|
|
delay := time.Since(start)
|
|
|
|
server.pingWindow[server.pingIndex] = delay
|
|
server.pingIndex = (server.pingIndex + 1) % len(server.pingWindow)
|
|
server.pingCount++
|
|
var max time.Duration
|
|
for i := 0; i < len(server.pingWindow) && uint32(i) < server.pingCount; i++ {
|
|
if server.pingWindow[i] > max {
|
|
max = server.pingWindow[i]
|
|
}
|
|
}
|
|
socket.Release()
|
|
server.Lock()
|
|
if server.closed {
|
|
loop = false
|
|
}
|
|
server.pingValue = max
|
|
server.Unlock()
|
|
logf("Ping for %s is %d ms", server.Addr, max/time.Millisecond)
|
|
} else if err == errServerClosed {
|
|
return
|
|
}
|
|
if !loop {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (server *mongoServer) poolShrinker() {
|
|
ticker := time.NewTicker(1 * time.Minute)
|
|
for _ = range ticker.C {
|
|
if server.closed {
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
server.Lock()
|
|
unused := len(server.unusedSockets)
|
|
if unused < server.dialInfo.MinPoolSize {
|
|
server.Unlock()
|
|
continue
|
|
}
|
|
now := time.Now()
|
|
end := 0
|
|
reclaimMap := map[*mongoSocket]struct{}{}
|
|
// Because the acquisition and recycle are done at the tail of array,
|
|
// the head is always the oldest unused socket.
|
|
for _, s := range server.unusedSockets[:unused-server.dialInfo.MinPoolSize] {
|
|
if s.lastTimeUsed.Add(time.Duration(server.dialInfo.MaxIdleTimeMS) * time.Millisecond).After(now) {
|
|
break
|
|
}
|
|
end++
|
|
reclaimMap[s] = struct{}{}
|
|
}
|
|
tbr := server.unusedSockets[:end]
|
|
if end > 0 {
|
|
next := make([]*mongoSocket, unused-end)
|
|
copy(next, server.unusedSockets[end:])
|
|
server.unusedSockets = next
|
|
remainSockets := []*mongoSocket{}
|
|
for _, s := range server.liveSockets {
|
|
if _, ok := reclaimMap[s]; !ok {
|
|
remainSockets = append(remainSockets, s)
|
|
}
|
|
}
|
|
server.liveSockets = remainSockets
|
|
stats.conn(-1*end, server.info.Master)
|
|
}
|
|
server.Unlock()
|
|
|
|
for _, s := range tbr {
|
|
s.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
type mongoServerSlice []*mongoServer
|
|
|
|
func (s mongoServerSlice) Len() int {
|
|
return len(s)
|
|
}
|
|
|
|
func (s mongoServerSlice) Less(i, j int) bool {
|
|
return s[i].ResolvedAddr < s[j].ResolvedAddr
|
|
}
|
|
|
|
func (s mongoServerSlice) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
func (s mongoServerSlice) Sort() {
|
|
sort.Sort(s)
|
|
}
|
|
|
|
func (s mongoServerSlice) Search(resolvedAddr string) (i int, ok bool) {
|
|
n := len(s)
|
|
i = sort.Search(n, func(i int) bool {
|
|
return s[i].ResolvedAddr >= resolvedAddr
|
|
})
|
|
return i, i != n && s[i].ResolvedAddr == resolvedAddr
|
|
}
|
|
|
|
type mongoServers struct {
|
|
slice mongoServerSlice
|
|
}
|
|
|
|
func (servers *mongoServers) Search(resolvedAddr string) (server *mongoServer) {
|
|
if i, ok := servers.slice.Search(resolvedAddr); ok {
|
|
return servers.slice[i]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (servers *mongoServers) Add(server *mongoServer) {
|
|
servers.slice = append(servers.slice, server)
|
|
servers.slice.Sort()
|
|
}
|
|
|
|
func (servers *mongoServers) Remove(other *mongoServer) (server *mongoServer) {
|
|
if i, found := servers.slice.Search(other.ResolvedAddr); found {
|
|
server = servers.slice[i]
|
|
copy(servers.slice[i:], servers.slice[i+1:])
|
|
n := len(servers.slice) - 1
|
|
servers.slice[n] = nil // Help GC.
|
|
servers.slice = servers.slice[:n]
|
|
}
|
|
return
|
|
}
|
|
|
|
func (servers *mongoServers) Slice() []*mongoServer {
|
|
return ([]*mongoServer)(servers.slice)
|
|
}
|
|
|
|
func (servers *mongoServers) Get(i int) *mongoServer {
|
|
return servers.slice[i]
|
|
}
|
|
|
|
func (servers *mongoServers) Len() int {
|
|
return len(servers.slice)
|
|
}
|
|
|
|
func (servers *mongoServers) Empty() bool {
|
|
return len(servers.slice) == 0
|
|
}
|
|
|
|
func (servers *mongoServers) HasMongos() bool {
|
|
for _, s := range servers.slice {
|
|
if s.Info().Mongos {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// BestFit returns the best guess of what would be the most interesting
|
|
// server to perform operations on at this point in time.
|
|
func (servers *mongoServers) BestFit(mode Mode, serverTags []bson.D) *mongoServer {
|
|
var best *mongoServer
|
|
for _, next := range servers.slice {
|
|
if best == nil {
|
|
best = next
|
|
best.RLock()
|
|
if len(serverTags) != 0 && !next.info.Mongos && !best.hasTags(serverTags) {
|
|
best.RUnlock()
|
|
best = nil
|
|
}
|
|
continue
|
|
}
|
|
next.RLock()
|
|
swap := false
|
|
switch {
|
|
case len(serverTags) != 0 && !next.info.Mongos && !next.hasTags(serverTags):
|
|
// Must have requested tags.
|
|
case mode == Secondary && next.info.Master && !next.info.Mongos:
|
|
// Must be a secondary or mongos.
|
|
case next.info.Master != best.info.Master && mode != Nearest:
|
|
// Prefer slaves, unless the mode is PrimaryPreferred.
|
|
swap = (mode == PrimaryPreferred) != best.info.Master
|
|
case absDuration(next.pingValue-best.pingValue) > 15*time.Millisecond:
|
|
// Prefer nearest server.
|
|
swap = next.pingValue < best.pingValue
|
|
case len(next.liveSockets)-len(next.unusedSockets) < len(best.liveSockets)-len(best.unusedSockets):
|
|
// Prefer servers with less connections.
|
|
swap = true
|
|
}
|
|
if swap {
|
|
best.RUnlock()
|
|
best = next
|
|
} else {
|
|
next.RUnlock()
|
|
}
|
|
}
|
|
if best != nil {
|
|
best.RUnlock()
|
|
}
|
|
return best
|
|
}
|
|
|
|
func absDuration(d time.Duration) time.Duration {
|
|
if d < 0 {
|
|
return -d
|
|
}
|
|
return d
|
|
}
|