@@ -8,216 +8,213 @@ const uv = process.binding('uv');
88const debug = util.debuglog('stream_wrap');
99const errors = require('internal/errors');
1010
11- function StreamWrap(stream) {
12- const handle = new JSStream();
13-
14- this.stream = stream;
15-
16- this._list = null;
17-
18- const self = this;
19- handle.close = function(cb) {
20- debug('close');
21- self.doClose(cb);
22- };
23- handle.isAlive = function() {
24- return self.isAlive();
25- };
26- handle.isClosing = function() {
27- return self.isClosing();
28- };
29- handle.onreadstart = function() {
30- return self.readStart();
31- };
32- handle.onreadstop = function() {
33- return self.readStop();
34- };
35- handle.onshutdown = function(req) {
36- return self.doShutdown(req);
37- };
38- handle.onwrite = function(req, bufs) {
39- return self.doWrite(req, bufs);
40- };
41-
42- this.stream.pause();
43- this.stream.on('error', function onerror(err) {
44- self.emit('error', err);
45- });
46- this.stream.on('data', function ondata(chunk) {
47- if (typeof chunk === 'string' || this._readableState.objectMode === true) {
48- // Make sure that no further `data` events will happen
49- this.pause();
50- this.removeListener('data', ondata);
51-
52- self.emit('error', new errors.Error('ERR_STREAM_WRAP'));
53- return;
54- }
55-
56- debug('data', chunk.length);
57- if (self._handle)
58- self._handle.readBuffer(chunk);
59- });
60- this.stream.once('end', function onend() {
61- debug('end');
62- if (self._handle)
63- self._handle.emitEOF();
64- });
65-
66- Socket.call(this, {
67- handle: handle
68- });
69- }
70- util.inherits(StreamWrap, Socket);
71- module.exports = StreamWrap;
72-
73- // require('_stream_wrap').StreamWrap
74- StreamWrap.StreamWrap = StreamWrap;
75-
76- StreamWrap.prototype.isAlive = function isAlive() {
77- return true;
78- };
79-
80- StreamWrap.prototype.isClosing = function isClosing() {
81- return !this.readable || !this.writable;
82- };
83-
84- StreamWrap.prototype.readStart = function readStart() {
85- this.stream.resume();
86- return 0;
87- };
88-
89- StreamWrap.prototype.readStop = function readStop() {
90- this.stream.pause();
91- return 0;
92- };
93-
94- StreamWrap.prototype.doShutdown = function doShutdown(req) {
95- const self = this;
96- const handle = this._handle;
97- const item = this._enqueue('shutdown', req);
98-
99- this.stream.end(function() {
100- // Ensure that write was dispatched
101- setImmediate(function() {
102- if (!self._dequeue(item))
11+ /* This class serves as a wrapper for when the C++ side of Node wants access
12+ * to a standard JS stream. For example, TLS or HTTP do not operate on network
13+ * resources conceptually, although that is the common case and what we are
14+ * optimizing for; in theory, they are completely composable and can work with
15+ * any stream resource they see.
16+ *
17+ * For the common case, i.e. a TLS socket wrapping around a net.Socket, we
18+ * can skip going through the JS layer and let TLS access the raw C++ handle
19+ * of a net.Socket. The flipside of this is that, to maintain composability,
20+ * we need a way to create "fake" net.Socket instances that call back into a
21+ * "real" JavaScript stream. JSStreamWrap is exactly this.
22+ */
23+ class JSStreamWrap extends Socket {
24+ constructor(stream) {
25+ const handle = new JSStream();
26+ handle.close = (cb) => {
27+ debug('close');
28+ this.doClose(cb);
29+ };
30+ handle.isAlive = () => this.isAlive();
31+ handle.isClosing = () => this.isClosing();
32+ handle.onreadstart = () => this.readStart();
33+ handle.onreadstop = () => this.readStop();
34+ handle.onshutdown = (req) => this.doShutdown(req);
35+ handle.onwrite = (req, bufs) => this.doWrite(req, bufs);
36+
37+ stream.pause();
38+ stream.on('error', (err) => this.emit('error', err));
39+ const ondata = (chunk) => {
40+ if (typeof chunk === 'string' ||
41+ stream._readableState.objectMode === true) {
42+ // Make sure that no further `data` events will happen.
43+ stream.pause();
44+ stream.removeListener('data', ondata);
45+
46+ this.emit('error', new errors.Error('ERR_STREAM_WRAP'));
10347 return;
48+ }
10449
105- handle.finishShutdown(req, 0);
50+ debug('data', chunk.length);
51+ if (this._handle)
52+ this._handle.readBuffer(chunk);
53+ };
54+ stream.on('data', ondata);
55+ stream.once('end', () => {
56+ debug('end');
57+ if (this._handle)
58+ this._handle.emitEOF();
10659 });
107- });
108- return 0;
109- };
11060
111- StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
112- const self = this;
113- const handle = self._handle;
61+ super({ handle, manualStart: true });
62+ this.stream = stream;
63+ this._list = null;
64+ this.read(0);
65+ }
11466
115- var pending = bufs.length;
67+ // Legacy
68+ static get StreamWrap() {
69+ return JSStreamWrap;
70+ }
11671
117- // Queue the request to be able to cancel it
118- const item = self._enqueue('write', req);
72+ isAlive() {
73+ return true;
74+ }
11975
120- self.stream.cork();
121- for (var n = 0; n < bufs.length; n++)
122- self.stream.write(bufs[n], done);
123- self.stream.uncork();
76+ isClosing() {
77+ return !this.readable || !this.writable;
78+ }
12479
125- function done(err) {
126- if (!err && --pending !== 0)
127- return;
80+ readStart() {
81+ this.stream.resume();
82+ return 0;
83+ }
12884
129- // Ensure that this is called once in case of error
130- pending = 0;
85+ readStop() {
86+ this.stream.pause();
87+ return 0;
88+ }
13189
132- let errCode = 0;
133- if (err) {
134- const code = uv[`UV_${err.code}`];
135- errCode = (err.code && code) ? code : uv.UV_EPIPE;
136- }
90+ doShutdown(req) {
91+ const handle = this._handle;
92+ const item = this._enqueue('shutdown', req);
13793
138- // Ensure that write was dispatched
139- setImmediate(function() {
140- // Do not invoke callback twice
141- if (!self ._dequeue(item))
142- return;
94+ this.stream.end(() => {
95+ // Ensure that write was dispatched
96+ setImmediate(() => {
97+ if (!this ._dequeue(item))
98+ return;
14399
144- handle.doAfterWrite (req);
145- handle.finishWrite(req, errCode );
100+ handle.finishShutdown (req, 0 );
101+ } );
146102 });
103+ return 0;
147104 }
148105
149- return 0;
150- };
106+ doWrite(req, bufs) {
107+ const self = this;
108+ const handle = this._handle;
151109
152- function QueueItem(type, req) {
153- this.type = type;
154- this.req = req;
155- this.prev = this;
156- this.next = this;
157- }
110+ var pending = bufs.length;
158111
159- StreamWrap.prototype._enqueue = function _enqueue(type, req) {
160- const item = new QueueItem(type, req);
161- if (this._list === null) {
162- this._list = item;
163- return item;
164- }
112+ // Queue the request to be able to cancel it
113+ const item = this._enqueue('write', req);
114+
115+ this.stream.cork();
116+ for (var n = 0; n < bufs.length; n++)
117+ this.stream.write(bufs[n], done);
118+ this.stream.uncork();
119+
120+ function done(err) {
121+ if (!err && --pending !== 0)
122+ return;
123+
124+ // Ensure that this is called once in case of error
125+ pending = 0;
165126
166- item.next = this._list.next;
167- item.prev = this._list;
168- item.next.prev = item;
169- item.prev.next = item;
127+ let errCode = 0;
128+ if (err) {
129+ const code = uv[`UV_${err.code}`];
130+ errCode = (err.code && code) ? code : uv.UV_EPIPE;
131+ }
170132
171- return item;
172- };
133+ // Ensure that write was dispatched
134+ setImmediate(function() {
135+ // Do not invoke callback twice
136+ if (!self._dequeue(item))
137+ return;
173138
174- StreamWrap.prototype._dequeue = function _dequeue(item) {
175- assert(item instanceof QueueItem);
139+ handle.doAfterWrite(req);
140+ handle.finishWrite(req, errCode);
141+ });
142+ }
176143
177- var next = item.next ;
178- var prev = item.prev;
144+ return 0 ;
145+ }
179146
180- if (next === null && prev === null)
181- return false;
147+ _enqueue(type, req) {
148+ const item = new QueueItem(type, req);
149+ if (this._list === null) {
150+ this._list = item;
151+ return item;
152+ }
182153
183- item.next = null;
184- item.prev = null;
154+ item.next = this._list.next;
155+ item.prev = this._list;
156+ item.next.prev = item;
157+ item.prev.next = item;
185158
186- if (next === item) {
187- prev = null;
188- next = null;
189- } else {
190- prev.next = next;
191- next.prev = prev;
159+ return item;
192160 }
193161
194- if (this._list === item)
195- this._list = next ;
162+ _dequeue( item) {
163+ assert(item instanceof QueueItem) ;
196164
197- return true ;
198- } ;
165+ var next = item.next ;
166+ var prev = item.prev ;
199167
200- StreamWrap.prototype.doClose = function doClose(cb) {
201- const self = this;
202- const handle = self._handle;
168+ if (next === null && prev === null)
169+ return false;
203170
204- setImmediate(function() {
205- while (self._list !== null) {
206- const item = self._list;
207- const req = item.req;
208- self._dequeue(item);
171+ item.next = null;
172+ item.prev = null;
209173
210- const errCode = uv.UV_ECANCELED;
211- if (item.type === 'write') {
212- handle.doAfterWrite(req);
213- handle.finishWrite(req, errCode);
214- } else if (item.type === 'shutdown') {
215- handle.finishShutdown(req, errCode);
216- }
174+ if (next === item) {
175+ prev = null;
176+ next = null;
177+ } else {
178+ prev.next = next;
179+ next.prev = prev;
217180 }
218181
219- // Should be already set by net.js
220- assert(self._handle === null);
221- cb();
222- });
223- };
182+ if (this._list === item)
183+ this._list = next;
184+
185+ return true;
186+ }
187+
188+ doClose(cb) {
189+ const handle = this._handle;
190+
191+ setImmediate(() => {
192+ while (this._list !== null) {
193+ const item = this._list;
194+ const req = item.req;
195+ this._dequeue(item);
196+
197+ const errCode = uv.UV_ECANCELED;
198+ if (item.type === 'write') {
199+ handle.doAfterWrite(req);
200+ handle.finishWrite(req, errCode);
201+ } else if (item.type === 'shutdown') {
202+ handle.finishShutdown(req, errCode);
203+ }
204+ }
205+
206+ // Should be already set by net.js
207+ assert.strictEqual(this._handle, null);
208+ cb();
209+ });
210+ }
211+ }
212+
213+ function QueueItem(type, req) {
214+ this.type = type;
215+ this.req = req;
216+ this.prev = this;
217+ this.next = this;
218+ }
219+
220+ module.exports = JSStreamWrap;
0 commit comments