1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 | 173 173 173 173 173 234 3 231 231 231 231 231 231 231 231 231 231 231 231 231 231 198 198 198 138 138 12 60 60 32 32 32 32 32 32 6 6 26 26 26 26 26 26 26 28 28 173 173 173 173 173 2219 2219 2216 3 2219 2219 2219 2219 2219 6 2219 2219 173 756 756 756 180 756 756 756 20 20 20 19 20 20 20 736 598 598 138 138 12 138 173 601 601 601 601 597 597 597 597 594 601 601 601 601 601 3 601 601 173 198 601 173 592 592 601 173 7 7 7 7 7 601 601 173 659 659 659 659 594 659 1253 1253 637 637 625 625 185 659 3 3 3 173 5 5 10 10 10 5 5 7 173 | (function () { 'use strict'; const net = require('net'); const util = require('util'); const EventEmitter = require('events').EventEmitter; const debug = util.debuglog('http'); // New Agent code. // The largest departure from the previous implementation is that // an Agent instance holds connections for a variable number of host:ports. // Surprisingly, this is still API compatible as far as third parties are // concerned. The only code that really notices the difference is the // request object. // Another departure is that all code related to HTTP parsing is in // ClientRequest.onSocket(). The Agent is now *strictly* // concerned with managing a connection pool. function Agent(options) { if (!(this instanceof Agent)) return new Agent(options); EventEmitter.call(this); var self = this; self.defaultPort = 80; self.protocol = 'http:'; self.options = util._extend({}, options); // don't confuse net and make it think that we're connecting to a pipe self.options.path = null; self.requests = {}; self.sockets = {}; self.freeSockets = {}; self.keepAliveMsecs = self.options.keepAliveMsecs || 1000; self.keepAlive = self.options.keepAlive || false; self.maxSockets = self.options.maxSockets || Agent.defaultMaxSockets; self.maxFreeSockets = self.options.maxFreeSockets || 256; self.on('free', function(socket, options) { var name = self.getName(options); debug('agent.on(free)', name); if (!socket.destroyed && self.requests[name] && self.requests[name].length) { self.requests[name].shift().onSocket(socket); if (self.requests[name].length === 0) { // don't leak delete self.requests[name]; } } else { // If there are no pending requests, then put it in // the freeSockets pool, but only if we're allowed to do so. var req = socket._httpMessage; if (req && req.shouldKeepAlive && !socket.destroyed && self.options.keepAlive) { var freeSockets = self.freeSockets[name]; var freeLen = freeSockets ? freeSockets.length : 0; var count = freeLen; Eif (self.sockets[name]) count += self.sockets[name].length; if (count > self.maxSockets || freeLen >= self.maxFreeSockets) { self.removeSocket(socket, options); socket.destroy(); } else { freeSockets = freeSockets || []; self.freeSockets[name] = freeSockets; socket.setKeepAlive(true, self.keepAliveMsecs); socket.unref(); socket._httpMessage = null; self.removeSocket(socket, options); freeSockets.push(socket); } } else { self.removeSocket(socket, options); socket.destroy(); } } }); } util.inherits(Agent, EventEmitter); exports.Agent = Agent; Agent.defaultMaxSockets = Infinity; Agent.prototype.createConnection = net.createConnection; // Get the key for a given set of request options Agent.prototype.getName = function(options) { var name = ''; if (options.host) name += options.host; else name += 'localhost'; name += ':'; Eif (options.port) name += options.port; name += ':'; if (options.localAddress) name += options.localAddress; name += ':'; return name; }; Agent.prototype.addRequest = function(req, options) { // Legacy API: addRequest(req, host, port, path) Iif (typeof options === 'string') { options = { host: options, port: arguments[2], path: arguments[3] }; } var name = this.getName(options); if (!this.sockets[name]) { this.sockets[name] = []; } var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0; var sockLen = freeLen + this.sockets[name].length; if (freeLen) { // we have a free socket, so use that. var socket = this.freeSockets[name].shift(); debug('have free socket'); // don't leak if (!this.freeSockets[name].length) delete this.freeSockets[name]; socket.ref(); req.onSocket(socket); this.sockets[name].push(socket); } else if (sockLen < this.maxSockets) { debug('call onSocket', sockLen, freeLen); // If we are under maxSockets create a new one. req.onSocket(this.createSocket(req, options)); } else { debug('wait for socket'); // We are over limit so we'll add it to the queue. if (!this.requests[name]) { this.requests[name] = []; } this.requests[name].push(req); } }; Agent.prototype.createSocket = function(req, options) { var self = this; options = util._extend({}, options); options = util._extend(options, self.options); if (!options.servername) { options.servername = options.host; Eif (req) { var hostHeader = req.getHeader('host'); if (hostHeader) { options.servername = hostHeader.replace(/:.*$/, ''); } } } var name = self.getName(options); debug('createConnection', name, options); options.encoding = null; var s = self.createConnection(options); if (!self.sockets[name]) { self.sockets[name] = []; } this.sockets[name].push(s); debug('sockets', name, this.sockets[name].length); function onFree() { self.emit('free', s, options); } s.on('free', onFree); function onClose(err) { debug('CLIENT socket onClose'); // This is the only place where sockets get removed from the Agent. // If you want to remove a socket from the pool, just close it. // All socket errors end in a close event anyway. self.removeSocket(s, options); } s.on('close', onClose); function onRemove() { // We need this function for cases like HTTP 'upgrade' // (defined by WebSockets) where we need to remove a socket from the // pool because it'll be locked up indefinitely debug('CLIENT socket onRemove'); self.removeSocket(s, options); s.removeListener('close', onClose); s.removeListener('free', onFree); s.removeListener('agentRemove', onRemove); } s.on('agentRemove', onRemove); return s; }; Agent.prototype.removeSocket = function(s, options) { var name = this.getName(options); debug('removeSocket', name, 'destroyed:', s.destroyed); var sets = [this.sockets]; // If the socket was destroyed, remove it from the free buffers too. if (s.destroyed) sets.push(this.freeSockets); for (var sk = 0; sk < sets.length; sk++) { var sockets = sets[sk]; if (sockets[name]) { var index = sockets[name].indexOf(s); if (index !== -1) { sockets[name].splice(index, 1); // Don't leak if (sockets[name].length === 0) delete sockets[name]; } } } if (this.requests[name] && this.requests[name].length) { debug('removeSocket, have a request, make a socket'); var req = this.requests[name][0]; // If we have pending requests and a socket gets closed make a new one this.createSocket(req, options).emit('free'); } }; Agent.prototype.destroy = function() { var sets = [this.freeSockets, this.sockets]; for (var s = 0; s < sets.length; s++) { var set = sets[s]; var keys = Object.keys(set); for (var v = 0; v < keys.length; v++) { var setName = set[keys[v]]; for (var n = 0; n < setName.length; n++) { setName[n].destroy(); } } } }; exports.globalAgent = new Agent(); }()); |