不要怂,就是干,撸起袖子干!

connector-manager.js 6.07 KB
'use strict';

var Query = require('./query')
  , Utils = require('../../utils');

module.exports = (function() {
  var ConnectorManager = function(sequelize, config) {
    var pgModule = config.dialectModulePath || 'pg';

    this.sequelize = sequelize;
    this.client = null;
    this.config = config || {};
    this.config.port = this.config.port || 5432;
    this.pooling = (!!this.config.pool && (this.config.pool.maxConnections > 0));
    this.pg = this.config.native ? require(pgModule).native : require(pgModule);
    // Better support for BigInts
    // https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
    this.pg.types.setTypeParser(20, String);

    this.disconnectTimeoutId = null;
    this.pendingQueries = 0;
    this.clientDrained = true;
    this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50);
    this.ConnectionParameters = require(pgModule + '/lib/connection-parameters');

    this.onProcessExit = function() {
      this.disconnect();
    }.bind(this);

    process.on('exit', this.onProcessExit);
  };
  Utils._.extend(ConnectorManager.prototype, require('../abstract/connector-manager').prototype);

  ConnectorManager.prototype.endQuery = function() {
    var self = this;

    self.pendingQueries--;

    if (!self.pooling && self.pendingQueries === 0) {
      setTimeout(function() {
        self.pendingQueries === 0 && self.disconnect.call(self);
      }, 100);
    }
  };

  ConnectorManager.prototype.query = function(sql, callee, options) {
    var self = this;

    self.pendingQueries++;
    self.clientDrained = false;

    return self.connect().then(function(done) {
      var query = new Query(self.client, self.sequelize, callee, options || {});

      // We return the query regardless of error or success in the query
      return query.run(sql).finally (function() {
        self.endQuery.call(self);
        done && done();
      });
    });
  };

  ConnectorManager.prototype.afterTransactionSetup = function(callback) {
    this.setTimezone(this.client, 'UTC', callback);
  };

  ConnectorManager.prototype.connect = function(callback) {
    var self = this;

    return new Utils.Promise(function(resolve, reject) {
      // in case database is slow to connect, prevent orphaning the client
      // TODO: We really need some sort of queue/flush/drain mechanism
      if (this.isConnecting && !this.pooling && this.client === null) {
        return resolve();
      }

      this.isConnecting = true;
      this.isConnected = false;

      var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
        , config = new this.ConnectionParameters(uri);

      // set pooling parameters if specified
      if (this.pooling) {
        config.poolSize = this.config.pool.maxConnections || 10;
        config.poolIdleTimeout = this.config.pool.maxIdleTime || 30000;
        config.reapIntervalMillis = this.config.pool.reapInterval || 1000;
        config.uuid = this.config.uuid;
      }
      var connectCallback = function(err, client, done) {
        var timezoneCallback = function() {
            self.isConnected = true;
            self.client = client;
            resolve(done);
          };

        self.isConnecting = false;

        if (!!err) {
          // release the pool immediately, very important.
          done && done(err);
          self.client = null;

          if (err.code) {
            switch (err.code) {
            case 'ECONNREFUSED':
              reject(new Error('Failed to authenticate for PostgresSQL. Please double check your settings.'));
              break;
            case 'ENOTFOUND':
            case 'EHOSTUNREACH':
            case 'EINVAL':
              reject(new Error('Failed to find PostgresSQL server. Please double check your settings.'));
              break;
            default:
              reject(err);
              break;
            }
          } else {
            reject(new Error(err.message));
          }
        } else if (client) {
          if (self.config.keepDefaultTimezone) {
            timezoneCallback();
          } else {
            self.setTimezone(client, 'UTC', timezoneCallback);
          }
        } else if (self.config.native) {
          if (self.config.keepDefaultTimezone) {
            timezoneCallback();
          } else {
            self.setTimezone(self.client, 'UTC', timezoneCallback);
          }
        } else {
          done && done();
          self.client = null;
          resolve();
        }
      };

      if (this.pooling) {
        // acquire client from pool
        this.pg.connect(config, connectCallback);
      } else {
        if (!!this.client) {
          connectCallback(null, this.client);
        } else {
          //create one-off client

          var responded = false;

          this.client = new this.pg.Client(config);
          this.client.connect(function(err, client, done) {
            responded = true;
            connectCallback(err, client || self.client, done);
          });

          // If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
          this.client.on('end', function() {
            if (!responded) {
              connectCallback(new Error('Connection timed out'));
            }
          });

          // Closes a client correctly even if we have backed up queries
          // https://github.com/brianc/node-postgres/pull/346
          this.client.on('drain', function() {
            self.clientDrained = true;
          });
        }
      }
    }.bind(this));
  };

  ConnectorManager.prototype.setTimezone = function(client, timezone, callback) {
    client.query("SET TIME ZONE '" + (timezone ||  'UTC') + "'").on('error', function (err) {
      callback(err);
    }).on('end', function () {
      callback();
    });
  };

  ConnectorManager.prototype.disconnect = function() {
    if (this.client) {
      if (this.clientDrained) {
        this.client.end();
      }
      this.client = null;
    }

    this.isConnecting = false;
    this.isConnected = false;
  };

  return ConnectorManager;
})();