import cPickle as pickle
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.python import log
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
from jasmin.vendor.smpp.pdu.pdu_types import DataCoding
import psycopg2
q = {}
#use your db package to connect on your database this is postgres
db = psycopg2.connect(database='dbname', user='user', host='host', password='password')
cursor = db.cursor()
@inlineCallbacks
def gotConnection(conn, username, password):
yield conn.start({"LOGIN": username, "PASSWORD": password})
chan = yield conn.channel(1)
yield chan.channel_open()
yield chan.queue_declare(queue="sms_mo_logger_queue")
# Bind to deliver_sm_thrower.* to track SMS-MO
yield chan.queue_bind(queue="sms_mo_logger_queue", exchange="messaging", routing_key='deliver_sm_thrower.*')
yield chan.basic_qos(prefetch_count=1)
yield chan.basic_consume(queue='sms_mo_logger_queue', consumer_tag="sms_mo_logger")
queue = yield conn.queue("sms_mo_logger")
# Wait for messages
# This can be done through a callback ...
while True:
msg = yield queue.get()
props = msg.content.properties
if msg.routing_key[:19] == 'deliver_sm_thrower.':
props = msg.content.properties
pdu = pickle.loads(msg.content.body)
short_message = pdu.params['short_message']
pdu_count = 1
# Is it a multipart message ?
while hasattr(pdu, 'nextPdu'):
# Remove UDH from first part
if pdu_count == 1:
short_message = short_message[6:]
pdu = pdu.nextPdu
# Update values:
pdu_count += 1
short_message += pdu.params['short_message'][6:]
# If it's a binary message, assume it's utf_16_be encoded
if pdu.params['data_coding'] is not None:
dc = pdu.params['data_coding']
if (isinstance(dc, int) and dc == 8) or (isinstance(dc, DataCoding) and str(dc.schemeData) == 'UCS2'):
short_message = short_message.decode('utf_16_be', 'ignore').encode('utf_8')
#get the data which you need
qmsg = {
'message-id': props['message-id'],
'source_connector': props['headers']['src-connector-id'],
'routed_cid': props['headers']['dst-connector-id'],
'destination_addr': pdu.params['destination_addr'],
'source_addr': pdu.params['source_addr'],
'pdu_count': pdu_count,
'short_message': short_message
}
sql = "YOUR INSERT SQL WITH THE DATA YOU WANT"
cursor.execute(sql)
db.commit()
chan.basic_ack(delivery_tag=msg.delivery_tag)
# A clean way to tear down and stop
yield chan.basic_cancel("sms_mo_logger")
yield chan.channel_close()
chan0 = yield conn.channel(0)
yield chan0.connection_close()
reactor.stop()
if __name__ == "__main__":
host = '127.0.0.1'
port = 5672
vhost = '/'
username = 'guest'
password = 'guest'
spec_file = '/etc/jasmin/resource/amqp0-9-1.xml'
spec = txamqp.spec.load(spec_file)
# Connect and authenticate
d = ClientCreator(reactor,
AMQClient,
delegate=TwistedDelegate(),
vhost=vhost,
spec=spec).connectTCP(host, port)
d.addCallback(gotConnection, username, password)
def whoops(err):
if reactor.running:
log.err(err)
reactor.stop()
d.addErrback(whoops)
reactor.run()