不要怂,就是干,撸起袖子干!

connector-manager.js 6.02 KB
"use strict";

var Query                = require("./query")
  , Utils                = require("../../utils")

module.exports = (function() {
  var ConnectorManager = function(sequelize, config) {
    var pgModule = config.dialectModulePath || 'pg'

    this.sequelize = sequelize
    this.client         = null
    this.config         = config || {}
    this.config.port    = this.config.port || 5432
    this.pooling        = (!!this.config.pool && (this.config.pool.maxConnections > 0))
    this.pg             = this.config.native ? require(pgModule).native : require(pgModule)
    // Better support for BigInts
    // https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
    this.pg.types.setTypeParser(20, String);

    this.disconnectTimeoutId = null
    this.pendingQueries = 0
    this.clientDrained = true
    this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
    this.ConnectionParameters = require(pgModule + '/lib/connection-parameters')

    this.onProcessExit = function () {
      this.disconnect()
    }.bind(this);

    process.on('exit', this.onProcessExit)
  }
  Utils._.extend(ConnectorManager.prototype, require("../abstract/connector-manager").prototype)

  ConnectorManager.prototype.endQuery = function() {
    var self = this

    self.pendingQueries--

    if (!self.pooling && self.pendingQueries === 0) {
      setTimeout(function() {
        self.pendingQueries === 0 && self.disconnect.call(self)
      }, 100)
    }
  }

  ConnectorManager.prototype.query = function(sql, callee, options) {
    var self = this

    self.pendingQueries++
    self.clientDrained = false

    return self.connect().then(function(done) {
      var query = new Query(self.client, self.sequelize, callee, options || {})

      // We return the query regardless of error or success in the query
      return query.run(sql).finally(function () {
        self.endQuery.call(self);
        done && done();
      });
    });
  }

  ConnectorManager.prototype.afterTransactionSetup = function(callback) {
    this.setTimezone(this.client, 'UTC', callback)
  }

  ConnectorManager.prototype.connect = function(callback) {
    var self = this

    return new Utils.Promise(function (resolve, reject) {
      // in case database is slow to connect, prevent orphaning the client
      // TODO: We really need some sort of queue/flush/drain mechanism
      if (this.isConnecting && !this.pooling && this.client === null) {
        return resolve()
      }

      this.isConnecting = true
      this.isConnected  = false

      var uri    = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
        , config = new this.ConnectionParameters(uri)

      // set pooling parameters if specified
      if (this.pooling) {
        config.poolSize           = this.config.pool.maxConnections || 10
        config.poolIdleTimeout    = this.config.pool.maxIdleTime    || 30000
        config.reapIntervalMillis = this.config.pool.reapInterval   || 1000
        config.uuid               = this.config.uuid
      }
      var connectCallback = function(err, client, done) {
        var timezoneCallback = function() {
            self.isConnected = true
            self.client = client
            resolve(done)
          }

        self.isConnecting = false

        if (!!err) {
          // release the pool immediately, very important.
          done && done(err)
          self.client = null

          if (err.code) {
            switch(err.code) {
            case 'ECONNREFUSED':
              reject(new Error("Failed to authenticate for PostgresSQL. Please double check your settings."))
              break
            case 'ENOTFOUND':
            case 'EHOSTUNREACH':
            case 'EINVAL':
              reject(new Error("Failed to find PostgresSQL server. Please double check your settings."))
              break
            default:
              reject(err)
              break
            }
          } else {
            reject(new Error(err.message))
          }
        } else if (client) {
          if (self.config.keepDefaultTimezone) {
            timezoneCallback()
          } else {
            self.setTimezone(client, 'UTC', timezoneCallback)
          }
        } else if (self.config.native) {
          if (self.config.keepDefaultTimezone) {
            timezoneCallback()
          } else {
            self.setTimezone(self.client, 'UTC', timezoneCallback)  
          }          
        } else {
          done && done()
          self.client = null
          resolve()
        }
      }

      if (this.pooling) {
        // acquire client from pool
        this.pg.connect(config, connectCallback)
      } else {
        if (!!this.client) {
          connectCallback(null, this.client)
        } else {
          //create one-off client

          var responded = false

          this.client = new this.pg.Client(config)
          this.client.connect(function(err, client, done) {
            responded = true
            connectCallback(err, client || self.client, done)
          })

          // If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
          this.client.on('end', function () {
            if (!responded) {
              connectCallback(new Error('Connection timed out'))
            }
          })

          // Closes a client correctly even if we have backed up queries
          // https://github.com/brianc/node-postgres/pull/346
          this.client.on('drain', function() {
            self.clientDrained = true
          })
        }
      }
    }.bind(this))
  }

  ConnectorManager.prototype.setTimezone = function(client, timezone, callback) {
    client.query("SET TIME ZONE '" + (timezone || "UTC") + "'").on('end', callback)
  }

  ConnectorManager.prototype.disconnect = function() {
    if (this.client) {
      if (this.clientDrained) {
        this.client.end()
      }
      this.client = null
    }

    this.isConnecting = false
    this.isConnected  = false
  }

  return ConnectorManager
})()