// worker.js
pubSubService.subscribe('driver-log-sync', 'driver-log-sync', pubSubService.syncDriverLog)
pubSubService.subscribe('driver-log-sync-delete', 'driver-log-sync-delete', pubSubService.syncDriverLogDelete)
pubSubService.subscribe('hos-event-history-sync', 'hos-event-history-sync', pubSubService.syncHosEventHistory)
// PubSubService.js
const Pubsub = require('@google-cloud/pubsub') const LoggingService = require('./loggingService')
const DriverLogsService = require('./driverLogsService')
const path = require('path')
const PQueue = require('p-queue')
let config = {
projectId: process.env.GCLOUD_PROJECT
}
const pubsub = Pubsub(config)
const queue = new PQueue({concurrency: 4})
function subscribe (topicName = '', subscriptionName = '', cb) {
function handleMessage (message) {
message.data = message.data.toString('utf-8')
cb(null, message)
}
function handleError (err) {
console.log('ERR')
LoggingService.error(err)
}
if (subscriptionName !== '' && topicName !== '') {
let topic = pubsub.topic(topicName)
let subscription = topic.subscription(subscriptionName)
subscription.on('message', handleMessage)
subscription.on('error', handleError)
LoggingService.info(`Listening to topic ${topicName} via subscription ${subscriptionName}`)
} else {
cb(new Error('Missing topic name and/or subscription name.'))
}
}
async function syncDriverLog (err, message) {
try {
if (err) {
throw new Error(err.message)
}
let { dotNumber, logDate, driverId } = message.attributes
if (!dotNumber) {
throw new Error('Missing DOT number for driver-log-sync')
}
if (!logDate) {
throw new Error('Missing Log Date for driver-log-sync')
}
if (!driverId) {
throw new Error('Missing Driver Id for driver-log-sync')
}
queue.add(async () => {
try {
await delay(25)
await DriverLogsService.syncDriverLogToElasticSearch(dotNumber, logDate, driverId, (new Date(message.publishTime).getTime()))
message.ack()
LoggingService.log(`Successfully synced log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`)
} catch (err) {
message.ack()
LoggingService.error(`Error syncing log to ElasticSearch for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`, err)
}
})
} catch (err) {
message.ack()
LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
}
}
async function syncDriverLogDelete (err, message) {
try {
if (err) {
throw new Error(err.message)
}
let { dotNumber, logDate, driverId } = message.attributes
if (!dotNumber) {
throw new Error('Missing DOT number for driver-log-sync')
}
if (!logDate) {
throw new Error('Missing Log Date for driver-log-sync')
}
if (!driverId) {
throw new Error('Missing Driver Id for driver-log-sync')
}
queue.add(async () => {
try {
await delay(25)
await DriverLogsService.deleteDriverLogFromElasticSearch(dotNumber, logDate, driverId)
message.ack()
LoggingService.log(`Successfully deleted log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`)
} catch (err) {
message.ack()
LoggingService.error(`Error deleting log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`, err)
}
})
} catch (err) {
message.ack()
LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
}
}
async function syncHosEventHistory (err, message) {
try {
if (err) {
throw new Error(err.message)
}
let { hosEventHistoryNodeId, driverId } = message.attributes
let { hosEvent } = JSON.parse(message.data)
if (!hosEventHistoryNodeId) {
throw new Error('Missing HOS Event History Node Id for hos-event-history-sync')
}
if (!hosEvent) {
throw new Error('Missing HOS Event for hos-event-history-sync')
}
if (!driverId) {
throw new Error('Missing User Id for hos-event-history-sync')
}
queue.add(async () => {
try {
await delay(25)
await DriverLogsService.syncHosEventHistoryToElasticSearch(hosEventHistoryNodeId, hosEvent, driverId, (new Date(message.publishTime).getTime()))
message.ack()
LoggingService.log(`Successfully synced history for driver: ${driverId}, historyNodeId: ${hosEventHistoryNodeId}`)
} catch (err) {
message.ack()
LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
}
})
} catch (err) {
message.ack()
LoggingService.error('Error syncing log to ElasticSearch', err)
}
}
function delay (dur) {
return new Promise((resolve) => {
setTimeout(() => {
resolve()
}, dur)
})
}
module.exports = {
subscribe,
syncDriverLog,
syncDriverLogDelete,
syncHosEventHistory
}