12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- 'use strict';
- var domain = require('domain');
- var eos = require('end-of-stream');
- var tick = require('process-nextick-args');
- var once = require('once');
- var exhaust = require('stream-exhaust');
- var eosConfig = {
- error: false,
- };
- function rethrowAsync(err) {
- process.nextTick(rethrow);
- function rethrow() {
- throw err;
- }
- }
- function tryCatch(fn, args) {
- try {
- return fn.apply(null, args);
- } catch (err) {
- rethrowAsync(err);
- }
- }
- function asyncDone(fn, cb) {
- cb = once(cb);
- var d = domain.create();
- d.once('error', onError);
- var domainBoundFn = d.bind(fn);
- function done() {
- d.removeListener('error', onError);
- d.exit();
- return tryCatch(cb, arguments);
- }
- function onSuccess(result) {
- done(null, result);
- }
- function onError(error) {
- if (!error) {
- error = new Error('Promise rejected without Error');
- }
- done(error);
- }
- function asyncRunner() {
- var result = domainBoundFn(done);
- function onNext(state) {
- onNext.state = state;
- }
- function onCompleted() {
- onSuccess(onNext.state);
- }
- if (result && typeof result.on === 'function') {
- // Assume node stream
- d.add(result);
- eos(exhaust(result), eosConfig, done);
- return;
- }
- if (result && typeof result.subscribe === 'function') {
- // Assume RxJS observable
- result.subscribe(onNext, onError, onCompleted);
- return;
- }
- if (result && typeof result.then === 'function') {
- // Assume promise
- result.then(onSuccess, onError);
- return;
- }
- }
- tick(asyncRunner);
- }
- module.exports = asyncDone;
|