I am loading 600k verticies into AWS Neptune using 4 workers with gremlin-python. After a few minutes I see this error. My worker code is below. Is there a better way to handle connection? Possibly using connection pooling? Thanks.--gremlin_python.driver.protocol.GremlinServerError: 500: {"requestId":"10ce5f4d-c016-4559-b506-6f33f355cdfb","code":"ConcurrentModificationException","detailedMessage":"Failed to complete Insert operation for a Vertex due to conflicting concurrent operations. Please retry."}import boto3import json, osimport botocoreimport timefrom datetime import datetime, timedeltafrom gremlin_python import staticsfrom gremlin_python.structure.graph import Graphfrom gremlin_python.process.graph_traversal import __from gremlin_python.driver.driver_remote_connection import DriverRemoteConnectionfrom gremlin_python.process.traversal import Tfrom gremlin_python.process.traversal import Orderfrom gremlin_python.process.traversal import Cardinalityfrom gremlin_python.process.traversal import Columnfrom gremlin_python.process.traversal import Directionfrom gremlin_python.process.traversal import Operatorfrom gremlin_python.process.traversal import Pfrom gremlin_python.process.traversal import Popfrom gremlin_python.process.traversal import Scopefrom gremlin_python.process.traversal import Barrierfrom datetime import datetime, timedeltagraph = Graph()g = graph.traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin','g'))boto3.setup_default_session(region_name='us-east-1')sqs = boto3.client('sqs')url = ''s3 = boto3.resource('s3')BUCKET_NAME = ''while True:response = sqs.receive_message(QueueUrl=url,WaitTimeSeconds=20,MaxNumberOfMessages=1)if 'Messages' in response:data = json.loads(response['Messages'][0]['Body'])msgType = (data['msgType'])key = (data['path'])file_path = (data['path'].split('/'))path_index = len(file_path) -1name = file_path[path_index]try:s3.Bucket(BUCKET_NAME).download_file(key, '/tmp/' + name)now = datetime.utcnow()print('started:', now.strftime("%Y-%m-%d %H:%M:%S"))if 'eni' in msgType:with open('/tmp/' + name,'r') as f:for line in f:result = line.split(',')label = result[0]node_type = result[1]eni_id = result[2]description = result[3]account = result[4]region = result[5]availability_zone = result[6]vpc_id = result[7]subnet_id = result[8]private_ip_address = result[9]private_dns_name = result[10]name_tag = result[11]business_unit_tag = result[12]business_region_tag = result[13]platform_tag = result[14]client_tag = result[15]resourcecreatedby_tag = result[16]first_time_stamp = result[17]last_time_stamp = result[18]inserted_date = result[19]if 'Lambda' in description:node_type = 'lambda'elif 'Glue' in description:node_type = 'glue'elif 'EFS' in description:node_type = 'efs'elif 'ELB' in description:node_type = 'elb'elif 'RDSNetworkInterface' in description:node_type = 'rds'elif 'ElasticMapReduce' in description:node_type = 'emr'elif 'ElastiCache' in description:node_type = 'elasticache'elif 'RedshiftNetworkInterface' in description:node_type = 'redshift'else:node_type = 'ec2'vertex = g.addV(label).property('type', node_type).property('eni_id', eni_id).property('description', description).property('account', account).property('region', region).property('availability_zone', availability_zone).property('vpc_id', vpc_id).property('subnet_id',subnet_id).property('private_ip_address', private_ip_address).property('private_dns_name',private_dns_name).property('name_tag', name_tag).property('business_unit_tag', business_unit_tag).property('business_region_tag',business_region_tag).property('platform_tag', platform_tag).property('client_tag', client_tag).property('resourcecreatedby_tag', resourcecreatedby_tag).property('first_time_stamp', first_time_stamp).property('last_time_stamp', last_time_stamp).property('inserted_date', inserted_date).toList()else:with open('/tmp/' + name,'r') as f:content = f.readlines()content = [x.strip() for x in content]for key, item in enumerate(content):result = content[key].split(',')label = result[0]account = result[1]interfaceid = result[2]srcaddr = result[3]dstaddr = result[4]srcport = result[5]dstport = result[6]protocol = result[7]packets = result[8]txbytes = result[9]starttime = result[10]endtime = result[11]action = result[12]logstatus = result[13]datetime_object = datetime.strptime(starttime, '%Y-%m-%dT%H:%M:%S')last_hour_date_time = datetime_object - timedelta(hours = 1)endtime = last_hour_date_time.strftime('%Y-%m-%dT%H:%M:%S')eni = str(g.V().has('eni_id', interfaceid).toList()).strip("[v[]")v1 = g.V().has('private_ip_address', dstaddr).has('last_time_stamp', P.gte('endtime')).has('last_time_stamp', P.lte('starttime')).toList()v2 = g.V().has('private_ip_address', srcaddr).has('last_time_stamp', P.gte('endtime')).has('last_time_stamp', P.lte('starttime')).toList()edge = g.V(eni).addE(str(label)).to(g.V(eni)).property('account', account).property('interfaceid', interfaceid).property('srcaddr', srcaddr).property('dstaddr', srcaddr).property('srcport', srcport).property('dstport', dstport).property('protocol', protocol).property('packets', packets).property('bytes', txbytes).property('starttime', starttime).property('endtime', endtime).property('action', action).property('logstatus', logstatus).toList()now = datetime.utcnow()print('stopped:', now.strftime("%Y-%m-%d %H:%M:%S"))sqs.delete_message(QueueUrl=url,ReceiptHandle=response['Messages'][0]['ReceiptHandle'])os.remove('/tmp/' + name)except botocore.exceptions.ClientError as e:if e.response['Error']['Code'] == "404":print("The object does not exist.")else:raise
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/bfb36524-de42-43ee-9f8c-4c66fcdfc4ba%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.