duplex stream getting changed in callback:node js

8 views
Skip to first unread message

Bhavesh Joshi

unread,
Mar 31, 2018, 11:30:53 AM3/31/18
to nodejs
I have a code where I am getting a duplex stream and in that function I call a callback which return me the values from redis.


          function (index, arr, null, callback) {
          const streamObject = stream;
          const Id = arr[index].split(':')[0];
          const Version = arr[index].split(':')[1];
    
    
          console.log("STREAM RECEIVED IN SECOND 1");
          console.log(streamObject);//printing for the first time
          var response = this.ts.service(Id, Version, streamObject, (fn, type) => {
            console.log("STREAM RECEIVED IN SECOND 2");
            console.log(streamObject);
    }

here when I print the stream object for the first time I get the stream object as follows

    STREAM RECEIVED IN SECOND 1
    Stream {
      domain: 
       Domain {
         domain: null,
         _events: { error: [Function] },
         _eventsCount: 1,
         _maxListeners: undefined,
         members: [] },
      _events: 
       { end: [Function],
         data: [Function],
         drain: [Function: ondrain],
         error: [Function: onerror],
         close: [Function: cleanup] },
      _eventsCount: 5,
      _maxListeners: undefined,
      writable: true,
      readable: true,
      paused: false,
      autoDestroy: true,
      write: [Function],
      push: [Function],
      queue: [Function],
      end: [Function],
      destroy: [Function],
      pause: [Function],
      resume: [Function] }

and in the second time I get

STREAM RECEIVED IN SECOND 2

    Stream {
      domain: 
       Domain {
         domain: null,
         _events: { error: [Function] },
         _eventsCount: 1,
         _maxListeners: undefined,
         members: [] },
      _events: { end: [Function], data: [Function] },
      _eventsCount: 2,
      _maxListeners: undefined,
      writable: false,
      readable: false,
      paused: false,
      autoDestroy: true,
      write: [Function],
      push: [Function],
      queue: [Function],
      end: [Function],
      destroy: [Function],
      pause: [Function],
      resume: [Function],
      root: null }
so stream is getting modified I am not sure why is it happening, I am not doing anything with the stream in the function called.

this is how my service method look which I am calling 

    service(id, vn, requestObject, callback) {
        log.info("entering transformer service");
        var transformerCombinedKey = id + vn;
    
        if (!_.isString(transformerId)) {
          throw new TypeError("Expected a string for transformerId");
        }
    
        if (!(transformerCombinedKey in this.functionRepositoryStore)) {
          //FIXME: after using cache as a implementation should ret
          var persistenceClientPromise = this.persistenceClient.getTransformerByIdAndVersion(transformerId, versionNumber)
    
          persistenceClientPromise.then(
            (aJSONStringifiedTransformer) => {
              if (!aJSONStringifiedTransformer) {
                callback(new Error("The given transformerId, " + transformerId + ", was not found."));
                return;
              }
    
              this.functionRepositoryStore[transformerCombinedKey] = JSON.parse(aJSONStringifiedTransformer).transformerFunction;
              this.transformerTypeRepository[transformerCombinedKey] = JSON.parse(aJSONStringifiedTransformer).transformerType;
    
              const code = "var _ = require('lodash'); console.log('runnig VM1'); module.exports = " + this.functionRepositoryStore[transformerCombinedKey] + ';';
              var transformerFunction = vm.runInNewContext(code, sandbox);
              console.log("Calling callback inside the transformer service===1 ");
              console.log(JSON.stringify(transformerFunction));
              callback(transformerFunction, this.transformerTypeRepository[transformerCombinedKey]);
              return
            },
            (error) => {
              log.error("Error while getting transformer for Id " + transformerId + " and versionNumber " + versionNumber);
              callback(error, null);
            });
    
    
        } else {
    
          const code = "var _ = require('lodash'); console.log('runnig VM2'); module.exports = " + this.functionRepositoryStore[transformerCombinedKey] + '; ';
          var transformerFunction = vm.runInNewContext(code, sandbox);
          console.log("Calling callback inside the transformer service=== ");
          console.log(JSON.stringify(transformerFunction));
          callback(transformerFunction, this.transformerTypeRepository[transformerCombinedKey]);
        }
      }
    
    }

this issue I see when I hit my app for the first time, when I hit it again without restarting the app, It works fine stream remain duplex also.
and also if I remove my call to redis this method , then stream doesn't change it works fine.

I am using waterfall and this is the second function of my waterfall model like

    async.waterfall([func1,func2], function (err, result) {
            if (err) {
              console.log(err);
              reject(err);
            } else {
              fulfill(result);
            }
    
          });
this first function also do the same thing but it works fine, the output of first is passed to the second.
this is how I create my stream

 

       let streamParser = JSONStream.parse('*');
                    streamParser.on('data', fn);
        
                    let toPassStreamObject = streamObject.pipe(JSONStream.stringify())
                      .pipe(streamParser)

streamObject is the stream which I get from my DB.

    fn (data) {
          data['Date'] = data["month"];
          delete data['month'];
        }


I stuck on this for some time.how to prevent stream from changing.

this is the link to stakoverflow
  


Reply all
Reply to author
Forward
0 new messages