Re: [TinkerPop] Is connection pooling possible with gremlin-python

968 views
Skip to first unread message

Stephen Mallette

unread,
May 14, 2018, 7:06:43 AM5/14/18
to Gremlin-users
I'm not extraordinarily familiar with Neptune, but that error message seems to indicate that you need to have some kind retry system in your code. I think that's a fairly normal expectation from some graph systems. I don't think any of the TinkerPop drivers have automated retry so you'll have to work something into your code for that.

On Fri, May 11, 2018 at 10:38 AM, <jnbank...@gmail.com> wrote:
I am loading 600k verticies into AWS Neptune using 4 workers with gremlin-python. After a few minutes I see this error. My worker code is below. Is there a better way to handle connection? Possibly using connection pooling? Thanks.

gremlin_python.driver.protocol.GremlinServerError: 500: {"requestId":"10ce5f4d-c016-4559-b506-6f33f355cdfb","code":"ConcurrentModificationException","detailedMessage":"Failed to complete Insert operation for a Vertex due to conflicting concurrent operations. Please retry."}

import boto3
import json, os
import botocore
import time
from datetime import datetime, timedelta
from gremlin_python import statics
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.traversal import T
from gremlin_python.process.traversal import Order
from gremlin_python.process.traversal import Cardinality
from gremlin_python.process.traversal import Column
from gremlin_python.process.traversal import Direction
from gremlin_python.process.traversal import Operator
from gremlin_python.process.traversal import P
from gremlin_python.process.traversal import Pop
from gremlin_python.process.traversal import Scope
from gremlin_python.process.traversal import Barrier
from datetime import datetime, timedelta

graph = Graph()
g = graph.traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin','g'))

boto3.setup_default_session(region_name='us-east-1')
sqs = boto3.client('sqs')
url = ''

s3 = boto3.resource('s3')

BUCKET_NAME = ''

while True:
    response = sqs.receive_message(QueueUrl=url,WaitTimeSeconds=20,MaxNumberOfMessages=1)
    if 'Messages' in response:
        data       = json.loads(response['Messages'][0]['Body'])
        msgType    = (data['msgType'])
        key        = (data['path'])
        file_path  = (data['path'].split('/'))
        path_index = len(file_path) -1
        name       = file_path[path_index]

        try:
            s3.Bucket(BUCKET_NAME).download_file(key, '/tmp/' + name)
            now = datetime.utcnow()
            print('started:', now.strftime("%Y-%m-%d %H:%M:%S"))
            if 'eni' in msgType:
                with open('/tmp/' + name,'r') as f:
                    for line in f:
                        result = line.split(',')
                        label                   = result[0]
                        node_type               = result[1]
                        eni_id                  = result[2]
                        description             = result[3]
                        account                 = result[4]
                        region                  = result[5]
                        availability_zone       = result[6]
                        vpc_id                  = result[7]
                        subnet_id               = result[8]
                        private_ip_address      = result[9]
                        private_dns_name        = result[10]
                        name_tag                = result[11]
                        business_unit_tag       = result[12]
                        business_region_tag     = result[13]
                        platform_tag            = result[14]
                        client_tag              = result[15]
                        resourcecreatedby_tag   = result[16]
                        first_time_stamp        = result[17]
                        last_time_stamp         = result[18]
                        inserted_date           = result[19]

                        if 'Lambda' in description:
                            node_type = 'lambda'
                        elif 'Glue' in description:
                            node_type = 'glue'
                        elif 'EFS' in description:
                            node_type = 'efs'
                        elif 'ELB' in description:
                            node_type = 'elb'
                        elif 'RDSNetworkInterface' in description:
                            node_type = 'rds'
                        elif 'ElasticMapReduce' in description:
                            node_type = 'emr'
                        elif 'ElastiCache' in description:
                            node_type = 'elasticache'
                        elif 'RedshiftNetworkInterface' in description:
                            node_type = 'redshift'
                        else:
                            node_type = 'ec2'

                        vertex = g.addV(label).property('type', node_type).property('eni_id', eni_id).property('description', description).property('account', account).property('region', region).property('availability_zone', availability_zone).property('vpc_id', vpc_id).property('subnet_id',subnet_id).property('private_ip_address', private_ip_address).property('private_dns_name',private_dns_name).property('name_tag', name_tag).property('business_unit_tag', business_unit_tag).property('business_region_tag',business_region_tag).property('platform_tag', platform_tag).property('client_tag', client_tag).property('resourcecreatedby_tag', resourcecreatedby_tag).property('first_time_stamp', first_time_stamp).property('last_time_stamp', last_time_stamp).property('inserted_date', inserted_date).toList()

            else:
                with open('/tmp/' + name,'r') as f:
                    content = f.readlines()
                    content = [x.strip() for x in content]
                    for key, item in enumerate(content):
                        result = content[key].split(',')
                        label               = result[0]
                        account             = result[1]
                        interfaceid         = result[2]
                        srcaddr             = result[3]
                        dstaddr             = result[4]
                        srcport             = result[5]
                        dstport             = result[6]
                        protocol            = result[7]
                        packets             = result[8]
                        txbytes             = result[9]
                        starttime           = result[10]
                        endtime             = result[11]
                        action              = result[12]
                        logstatus           = result[13]

                        datetime_object = datetime.strptime(starttime, '%Y-%m-%dT%H:%M:%S')
                        last_hour_date_time = datetime_object - timedelta(hours = 1)
                        endtime = last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S')

                        eni = str(g.V().has('eni_id', interfaceid).toList()).strip("[v[]")

                        v1 = g.V().has('private_ip_address', dstaddr).has('last_time_stamp', P.gte('endtime')).has('last_time_stamp', P.lte('starttime')).toList()

                        v2 = g.V().has('private_ip_address', srcaddr).has('last_time_stamp', P.gte('endtime')).has('last_time_stamp', P.lte('starttime')).toList()

                        edge = g.V(eni).addE(str(label)).to(g.V(eni)).property('account', account).property('interfaceid', interfaceid).property('srcaddr', srcaddr).property('dstaddr', srcaddr).property('srcport', srcport).property('dstport', dstport).property('protocol', protocol).property('packets', packets).property('bytes', txbytes).property('starttime', starttime).property('endtime', endtime).property('action', action).property('logstatus', logstatus).toList()

            now = datetime.utcnow()
            print('stopped:', now.strftime("%Y-%m-%d %H:%M:%S"))
            sqs.delete_message(QueueUrl=url,ReceiptHandle=response['Messages'][0]['ReceiptHandle'])
            os.remove('/tmp/' + name)


        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == "404":
                print("The object does not exist.")
            else:
                raise

--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/bfb36524-de42-43ee-9f8c-4c66fcdfc4ba%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages