I've multiple users to be added to a userlist. Below is the code for the same, Am I doing it correctly , can anyone lemme know.
def add_users(self, users: list):
offline_user_data_job_resource_name = self.create_offline_user_data_job()
request = self.google_client.get_type("AddOfflineUserDataJobOperationsRequest")
request.resource_name = offline_user_data_job_resource_name
request.operations = self.build_offline_user_data_job_operations(users, "add")
offline_user_data_job_run_response = self.run_offline_user_data_job(offline_user_data_job_resource_name)
def remove_users(self, user: list):
offline_user_data_job_resource_name = self.create_offline_user_data_job()
request = self.google_client.get_type("AddOfflineUserDataJobOperationsRequest")
request.resource_name = offline_user_data_job_resource_name
request.operations = self.build_offline_user_data_job_operations(user, "remove")
offline_user_data_job_run_response = self.run_offline_user_data_job(offline_user_data_job_resource_name)
def create_offline_user_data_job(self):
# Creates a new offline user data job.
user_list_resource_name = f"customers/{self.customer_id}/userLists/{self.audience_id}"
offline_user_data_job = self.google_client.get_type("OfflineUserDataJob")
offline_user_data_job.type_ = (
self.google_client.enums.OfflineUserDataJobTypeEnum.CUSTOMER_MATCH_USER_LIST
)
offline_user_data_job.customer_match_user_list_metadata.user_list = user_list_resource_name
# Issues a request to create an offline user data job.
create_offline_user_data_job_response = self.offline_user_data_job_service_client.create_offline_user_data_job(
customer_id=self.customer_id, job=offline_user_data_job
)
offline_user_data_job_resource_name = (
create_offline_user_data_job_response.resource_name
)
print(
"Created an offline user data job with resource name: "
f"{offline_user_data_job_resource_name}."
)
return offline_user_data_job_resource_name
def build_offline_user_data_job_operations(self, users: list, operation_type: str):
user_data_with_email_address_operation_list = []
try:
for email in users:
user_data_with_email_address_operation = self.google_client.get_type(
"OfflineUserDataJobOperation"
)
if operation_type == "add":
user_data_with_email_address = user_data_with_email_address_operation.create
else:
user_data_with_email_address = user_data_with_email_address_operation.remove
user_identifier_with_hashed_email = self.google_client.get_type("UserIdentifier")
user_identifier_with_hashed_email.hashed_email = email
user_data_with_email_address.user_identifiers.append(
user_identifier_with_hashed_email
)
user_data_with_email_address_operation_list.append(user_data_with_email_address_operation)
return user_data_with_email_address_operation_list
except GoogleAdsException as ex:
config.logger.info(f'Request with ID "{ex.request_id}" failed with status '
f'"{ex.error.code().name}" and includes the following errors:')
message = ''
for error in ex.failure.errors:
print(f'\tError with message "{error.message}".')
message += error.message + ' '
if error.location:
for field_path_element in error.location.field_path_elements:
print(f'\t\tOn field: {field_path_element.field_name}')
error = f'Error occured with status: {ex.error.code().name}, and message: {message}'
config.logger.error(error)
def run_offline_user_data_job(self, resource_name: str):
operation_response = self.offline_user_data_job_service_client.run_offline_user_data_job(resource_name)
return operation_response