PubSub subscriber on App Engine seems to work on initial start but gradually stops acknowledging

614 views
Skip to first unread message

Ben Hunt

unread,
Jan 3, 2018, 6:00:50 PM1/3/18
to Google Cloud Pub/Sub Discussions
I have sort of a weird issue and explaining it is somewhat difficult so please bear with me.

I have a worker service on App Engine that I'm using to pull messages off of a PubSub topic, it is just listening to the topic and when it receives a message, it pushes it to an ElasticSearch instance that I have. I have set the minimum number of instances of the worker to a high number (20) to attempt to process a large number of records in the queue. The issue is that it seems on initial startup of the instances, they process and acknowledge a bunch of records but after a while stop acknowledging them (see the images below, the large spikes in the acknowledge graph seem to correspond to deployments/restarts of the worker instances).

I have attached my code, the subscription listener is invoked by calling the subscribe method in PubSubSerice.js in worker.js, I've attached the relevant portions of worker.js.

Any help is greatly appreciated!

// worker.js
pubSubService.subscribe('driver-log-sync', 'driver-log-sync', pubSubService.syncDriverLog)
pubSubService.subscribe('driver-log-sync-delete', 'driver-log-sync-delete', pubSubService.syncDriverLogDelete)
pubSubService.subscribe('hos-event-history-sync', 'hos-event-history-sync', pubSubService.syncHosEventHistory)

// PubSubService.js
const
Pubsub = require('@google-cloud/pubsub')
const LoggingService = require('./loggingService')
const DriverLogsService = require('./driverLogsService')
const path = require('path')
const PQueue = require('p-queue')

let config = {
 projectId: process.env.GCLOUD_PROJECT
}

const pubsub = Pubsub(config)

const queue = new PQueue({concurrency: 4})

function subscribe (topicName = '', subscriptionName = '', cb) {
 function handleMessage (message) {
   message.data = message.data.toString('utf-8')
   cb(null, message)
 }
 function handleError (err) {
   console.log('ERR')
   LoggingService.error(err)
 }
 if (subscriptionName !== '' && topicName !== '') {
   let topic = pubsub.topic(topicName)
   let subscription = topic.subscription(subscriptionName)
   subscription.on('message', handleMessage)
   subscription.on('error', handleError)
   LoggingService.info(`Listening to topic ${topicName} via subscription ${subscriptionName}`)
 } else {
   cb(new Error('Missing topic name and/or subscription name.'))
 }
}

async function syncDriverLog (err, message) {
 try {
   if (err) {
     throw new Error(err.message)
   }
   let { dotNumber, logDate, driverId } = message.attributes
   if (!dotNumber) {
     throw new Error('Missing DOT number for driver-log-sync')
   }
   if (!logDate) {
     throw new Error('Missing Log Date for driver-log-sync')
   }
   if (!driverId) {
     throw new Error('Missing Driver Id for driver-log-sync')
   }
   queue.add(async () => {
     try {
       await delay(25)
       await DriverLogsService.syncDriverLogToElasticSearch(dotNumber, logDate, driverId, (new Date(message.publishTime).getTime()))
       message.ack()
       LoggingService.log(`Successfully synced log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`)
     } catch (err) {
       message.ack()
       LoggingService.error(`Error syncing log to ElasticSearch for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`, err)
     }
   })
 } catch (err) {
   message.ack()
   LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
 }
}

async function syncDriverLogDelete (err, message) {
 try {
   if (err) {
     throw new Error(err.message)
   }
   let { dotNumber, logDate, driverId } = message.attributes
   if (!dotNumber) {
     throw new Error('Missing DOT number for driver-log-sync')
   }
   if (!logDate) {
     throw new Error('Missing Log Date for driver-log-sync')
   }
   if (!driverId) {
     throw new Error('Missing Driver Id for driver-log-sync')
   }
   queue.add(async () => {
     try {
       await delay(25)
       await DriverLogsService.deleteDriverLogFromElasticSearch(dotNumber, logDate, driverId)
       message.ack()
       LoggingService.log(`Successfully deleted log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`)
     } catch (err) {
       message.ack()
       LoggingService.error(`Error deleting log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`, err)
     }
   })
 } catch (err) {
   message.ack()
   LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
 }
}

async function syncHosEventHistory (err, message) {
 try {
   if (err) {
     throw new Error(err.message)
   }
   let { hosEventHistoryNodeId, driverId } = message.attributes
   let { hosEvent } = JSON.parse(message.data)
   if (!hosEventHistoryNodeId) {
     throw new Error('Missing HOS Event History Node Id for hos-event-history-sync')
   }
   if (!hosEvent) {
     throw new Error('Missing HOS Event for hos-event-history-sync')
   }
   if (!driverId) {
     throw new Error('Missing User Id for hos-event-history-sync')
   }
   queue.add(async () => {
     try {
       await delay(25)
       await DriverLogsService.syncHosEventHistoryToElasticSearch(hosEventHistoryNodeId, hosEvent, driverId, (new Date(message.publishTime).getTime()))
       message.ack()
       LoggingService.log(`Successfully synced history for driver: ${driverId}, historyNodeId: ${hosEventHistoryNodeId}`)
     } catch (err) {
       message.ack()
       LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
     }
   })
 } catch (err) {
   message.ack()
   LoggingService.error('Error syncing log to ElasticSearch', err)
 }
}

function delay (dur) {
 return new Promise((resolve) => {
   setTimeout(() => {
     resolve()
   }, dur)
 })
}

module.exports = {
 subscribe,
 syncDriverLog,
 syncDriverLogDelete,
 syncHosEventHistory
}



Jordan (Cloud Platform Support)

unread,
Jan 4, 2018, 12:00:54 AM1/4/18
to Google Cloud Pub/Sub Discussions
Just for clarification, in order to be "listening to the topic" it is required to use a PUSH App Engine Endpoint and not a PULL worker. 

A PULL would require the worker to always be running in a loop to constantly PULL for messages (which would cost a lot of instance hours). App Engine has a request timeout for how long your code is allowed to run, therefore a infinite loop of PULLing is not possible. 

Ben Hunt

unread,
Jan 4, 2018, 10:40:15 AM1/4/18
to Google Cloud Pub/Sub Discussions
Thanks for the reply. If that were the case, wouldn't my instances be getting killed? I have the worker responding to health checks which I assume keeps it alive and able to pull. I basically just followed the tutorial here, which uses the pull method on a worker running in app engine. https://cloud.google.com/nodejs/getting-started/using-pub-sub#application_structure

Jordan (Cloud Platform Support)

unread,
Jan 4, 2018, 11:57:29 PM1/4/18
to Google Cloud Pub/Sub Discussions
I see, so you are not actually using request handlers to start your PULL subscriber event listeners, but are simply starting them right when your application loads. Therefore you are not restricted to the request timeout of the App Engine instance (though this still requires that your instances always run, which costs more. Having your application send PUSH messages to your background workers' via an endpoint will only have them run when there is actual work).   

I see you have correctly posted this issue on the Pub/Sub Google Cloud Client Library for NodeJS. Google Groups is reserved for general product discussions and not for technical support or for reporting Google-end issues. All further communications about this issue will occur in your issue report with the Pub/Sub Google engineering team.

Ben Hunt

unread,
Jan 5, 2018, 9:56:25 AM1/5/18
to Google Cloud Pub/Sub Discussions
Thanks for replying, Jordan. I have a question for you that's more about using push/pull. The reason I chose the pull method is because of this article https://cloud.google.com/pubsub/docs/subscriber#push_pull. I'm expecting to have a very large number of messages (more than 1/s) and the reasons for choosing pull over push outlined in the article seem to apply to my case. You seem to be recommending the push method, however. Can you elaborate?

Jordan (Cloud Platform Support)

unread,
Jan 5, 2018, 11:38:52 AM1/5/18
to Google Cloud Pub/Sub Discussions
If you require batch message delivery then yes PULL is the way to go, since PUSH will only send one message at a time (as they come in). 

Note that since your Worker service is on App Engine, PUSH requests to your Worker service will automatically kick up instances of your Worker code and scale to the incoming traffic. So if there are a lot of messages coming in, App Engine is designed to handle these requests so more than 1/s shouldn't be any issue. 

Of course with PUSH the Pub/Sub servers handle the rate of flow of messages for you, so if you require specific control over the delivery of messages than PULL is required.    

- It's always ideal to way cost/performance and choose the best option that fits your specific requirements. 

Ben Hunt

unread,
Jan 5, 2018, 12:13:28 PM1/5/18
to Google Cloud Pub/Sub Discussions
Great, thanks!

Kir Titievsky

unread,
Jan 5, 2018, 12:41:02 PM1/5/18
to Ben Hunt, Google Cloud Pub/Sub Discussions
Note that Pub/Sub PUSH subscriptions can deliver messages at a rate of up to 10K messages/second.  So that's the real bar. That said, it is more efficient to process messages in batches if they are coming at "more" than 1/second and the benefits of scaling to low throughput that App Engine provides are less material.  Where the trade-off justifies switching from the push to the pull model will depend on your use case.  But it is not a bad rule of thumb to start with PUSH if you are on App Engine.

--
You received this message because you are subscribed to the Google Groups "Google Cloud Pub/Sub Discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-pubsub-discuss+unsub...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-pubsub-discuss/795ff55b-9e34-44c8-80eb-0ff43f308a8f%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Kir Titievsky | Product Manager | Google Cloud Pub/Sub 

Ben Hunt

unread,
Jan 5, 2018, 6:17:49 PM1/5/18
to Google Cloud Pub/Sub Discussions
Thanks Kir, I'm going to try to switch to the push method to see if that fits my needs. I'll be honest that it's kind of frustrating that the documentation makes it seem like you should use the pull method if you expect any high rate of messages.

Jordan (Cloud Platform Support)

unread,
Jan 8, 2018, 10:42:33 AM1/8/18
to Google Cloud Pub/Sub Discussions
The documentations team is very open to suggestions and improvements. To better help the documentations in providing the best information which meets your requirements, it is recommended to use the 'Send Feedback' button at the top of any Google Cloud page. This will directly inform the documentations team of your experiences and suggestions for improvement.  
Reply all
Reply to author
Forward
0 new messages