Bom dia.
Sou novo em Phyton então se eu tiver falando besteira desconsiderem.
Estou fazendo assinatura nfts e nFSe prefeitura SAO PAulo devido a reforma tributaria.
A NFSE foi de boas mas a NFTS nao vai de jeito nenhum.
ALguem consegue me ajudar com isso? Vou colocar aqui o codigo .py que desenvolvi..
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sign NFTS (tpNFTS) strings and produce TesteEnvioLoteNFTS SOAP envelope.
✅ CORREÇÃO FINAL ERRO 1009: SerieNFTS com TAMANHO FIXO 5 (preenchido com espaços à direita)
tpSerieNFTS: Tipo série de documento NFTS. C 1-5 caracteres
"""
import sys
import os
import tempfile
import logging
import base64
import copy
from collections import OrderedDict
from lxml import etree
from cryptography.hazmat.primitives.serialization import pkcs12, Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
import xmlsec
# Configuração de Log
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger("sign_nfts")
# Namespaces
NS_SP = "
http://www.prefeitura.sp.gov.br/nfts"
NS_SOAP = "
http://schemas.xmlsoap.org/soap/envelope/"
# ---------------- Funções Auxiliares ----------------
def find_child(parent, tagname):
"""Encontra um filho por nome local, ignorando o namespace."""
nodes = parent.xpath('./*[local-name() = $name]', name=tagname)
return nodes[0] if nodes else None
def read_pkcs12(pfx_path, password):
"""Lê a chave privada e o certificado do arquivo PFX."""
with open(pfx_path, "rb") as f:
data = f.read()
private_key, cert, additional = pkcs12.load_key_and_certificates(data, password.encode("utf-8"))
if private_key is None or cert is None:
raise RuntimeError("Não foi possível extrair chave privada / certificado do PFX")
return private_key, cert
def ensure_path_for_debug():
"""Cria um diretório temporário para salvar arquivos de debug."""
if
os.name == 'nt':
dirpath = r'c:\temp\nfts_debug'
else:
dirpath = '/tmp/nfts_debug'
try:
os.makedirs(dirpath, exist_ok=True)
except Exception:
dirpath = tempfile.gettempdir()
return dirpath
def normalize_numeric_string(text):
"""
Remove padding (zeros à esquerda) de *qualquer* string numérica.
Ex: '001' -> '1', '43643139000166' -> '43643139000166'
"""
if text is None:
return ""
clean_text = text.replace('\xa0', ' ').strip()
if clean_text.isdigit():
try:
return str(int(clean_text))
except ValueError:
pass
return clean_text
# ✅ CORREÇÃO MANUAL: SerieNFTS tamanho FIXO 5 (preenchido com espaços à direita)
def normalize_serie_nfts(text):
"""
tpSerieNFTS: Tipo série de documento NFTS. C 1-5 caracteres
Preenche com espaços à direita até 5 caracteres na STRING CANÔNICA
"""
if text is None:
return ""
clean_text = text.replace('\xa0', ' ').strip()
# 🎯 PREENCHE ATÉ 5 CARACTERES COM ESPAÇOS À DIREITA
padded = (clean_text + " ")[:5] # 5 espaços, corta em 5 chars
return padded
def normalize_float_value(text):
"""Garante que valores float tenham EXATAMENTE duas casas decimais (ex: 1500.00)."""
if text is None: return ""
clean_text = text.replace('\xa0', ' ').replace(',', '.').strip()
try:
f_val = float(clean_text)
return "{:.2f}".format(f_val)
except ValueError:
return clean_text
def normalize_boolean_value(text):
"""Força o valor booleano para 'true' ou 'false' (minúsculo)."""
if text is None: return "false"
clean_text = text.replace('\xa0', ' ').strip().lower()
if clean_text in ("true", "1", "s", "sim", "t", "y", "yes"):
return "true"
return "false"
# ---------------- Canonicalização para tpNFTS ----------------
def build_tpNFTS_string(tpnode):
"""
Constrói o fragmento XML canônico para assinatura, conforme as regras da Prefeitura.
✅ SerieNFTS: tamanho 5 com espaços à direita
"""
clean_tp = copy.deepcopy(tpnode)
assin = find_child(clean_tp, "Assinatura")
if assin is not None:
clean_tp.remove(assin)
canonical_order_map = OrderedDict([
("TipoDocumento", "num_str"),
("ChaveDocumento", {
"InscricaoMunicipal": "str",
"SerieNFTS": "serie", # ✅ TAMANHO 5 COM ESPAÇOS
"NumeroDocumento": "num_str",
}),
("DataPrestacao", "str"),
("StatusNFTS", "str"),
("TributacaoNFTS", "str"),
("ValorServicos", "float"),
("ValorDeducoes", "float"),
("CodigoServico", "num_str"),
("CodigoSubItem", "num_str"),
("AliquotaServicos", "float"),
("ISSRetidoTomador", "bool"),
("ISSRetidoIntermediario", "bool"),
("Prestador", {
"CPFCNPJ": {
"CNPJ": "num_str",
"CPF": "num_str",
},
"InscricaoMunicipal": "str",
"RazaoSocialPrestador": "str",
"Endereco": {
"TipoLogradouro": "str",
"Logradouro": "str",
"NumeroEndereco": "str",
"ComplementoEndereco": "str_opt",
"Bairro": "str",
"Cidade": "num_str",
"UF": "str",
"CEP": "str",
},
"Email": "str_opt",
}),
("RegimeTributacao", "num_str"),
("DataPagamento", "str_opt"),
("Discriminacao", "str"),
("TipoNFTS", "num_str"),
("Tomador", {
"CPFCNPJ": {
"CPF": "num_str",
"CNPJ": "num_str",
},
"RazaoSocial": "str",
}),
])
def build_canonical_fragment(node, order_map):
"""Constrói o fragmento canônico recursivamente, mantendo a ordem."""
canonical_elements = []
for tag_name, definition in order_map.items():
original_child = find_child(node, tag_name)
if original_child is None:
continue
if isinstance(definition, str):
text_value = original_child.text
if text_value is None:
text_value = ""
# ✅ NORMALIZAÇÃO ESPECIAL PARA SERIE (tamanho 5)
if definition == "num_str":
final_text_value = normalize_numeric_string(text_value)
elif definition == "float":
final_text_value = normalize_float_value(text_value)
elif definition == "bool":
final_text_value = normalize_boolean_value(text_value)
elif definition == "serie":
final_text_value = normalize_serie_nfts(text_value) # 🎯 5 chars!
else:
final_text_value = text_value.replace('\xa0', ' ').strip()
if final_text_value == "":
continue
new_child = etree.Element(tag_name)
new_child.text = final_text_value
canonical_elements.append(new_child)
elif isinstance(definition, dict):
nested_elements = build_canonical_fragment(original_child, definition)
if nested_elements:
new_parent = etree.Element(tag_name)
new_parent.extend(nested_elements)
canonical_elements.append(new_parent)
return canonical_elements
canonical_root = etree.Element("tpNFTS")
children_in_order = build_canonical_fragment(clean_tp, canonical_order_map)
canonical_root.extend(children_in_order)
signed_xml_bytes = etree.tostring(
canonical_root,
encoding="utf-8",
xml_declaration=False,
pretty_print=False
)
return signed_xml_bytes.decode("utf-8")
# ---------------- Funções de Assinatura e SOAP ----------------
def sign_bytes_sha1_pkcs1(private_key, data_bytes):
"""Assina os bytes usando SHA1 e RSA PKCS#1 v1.5 (conforme o manual)."""
signature = private_key.sign(
data_bytes,
padding.PKCS1v15(),
hashes.SHA1()
)
return signature
def sign_document_xmlsec(root, key_pem_path, cert_pem_path):
"""Assina o documento inteiro (PedidoEnvioLoteNFTS) usando xmldsig (opcional)."""
signature_node = xmlsec.template.create(
root,
xmlsec.Transform.EXCL_C14N,
xmlsec.Transform.RSA_SHA1,
ns='ds'
)
root.append(signature_node)
ref = xmlsec.template.add_reference(signature_node, xmlsec.Transform.SHA1, uri="")
xmlsec.template.add_transform(ref, xmlsec.Transform.ENVELOPED)
key_info = xmlsec.template.ensure_key_info(signature_node)
xmlsec.template.add_x509_data(key_info)
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_file(key_pem_path, xmlsec.KeyFormat.PEM)
key.load_cert_from_file(cert_pem_path, xmlsec.KeyFormat.PEM)
ctx.key = key
ctx.sign(signature_node)
def build_soap_envelope(xml_string):
"""Constrói o envelope SOAP TesteEnvioLoteNFTS com o XML assinado em CDATA."""
envelope = etree.Element("{%s}Envelope" % NS_SOAP, nsmap={'soap': NS_SOAP})
body = etree.SubElement(envelope, "{%s}Body" % NS_SOAP)
request = etree.SubElement(body, "{%s}TesteEnvioLoteNFTSRequest" % NS_SP)
etree.SubElement(request, "{%s}VersaoSchema" % NS_SP).text = "2"
mensagem = etree.SubElement(request, "{%s}MensagemXML" % NS_SP)
mensagem.text = etree.CDATA(xml_string)
return etree.tostring(envelope, encoding="utf-8", xml_declaration=True, pretty_print=True)
# ---------------- Main ----------------
def main():
if len(sys.argv) != 5:
print("Uso: python sign_nfts.py input_nfts.xml certificado.pfx senha output_soap.xml")
sys.exit(1)
input_xml = sys.argv[1]
pfx_file = sys.argv[2]
senha = sys.argv[3]
output_soap = sys.argv[4]
logger.info("Carregando XML de entrada: %s", input_xml)
parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(input_xml, parser)
root = tree.getroot()
tmpdir = tempfile.gettempdir()
debug_dir = ensure_path_for_debug()
logger.info("Extraindo chave privada e certificado do PFX...")
private_key, cert = read_pkcs12(pfx_file, senha)
cert_pem_path = os.path.join(tmpdir, "tmp_cert_nfts.pem")
key_pem_path = os.path.join(tmpdir, "tmp_key_nfts.pem")
with open(cert_pem_path, "wb") as f:
f.write(cert.public_bytes(Encoding.PEM))
with open(key_pem_path, "wb") as f:
f.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
tp_nodes = root.xpath('//*[local-name()="NFTS"]')
if not tp_nodes:
logger.error("Nenhum elemento <NFTS> encontrado no XML.")
sys.exit(1)
else:
logger.info("Encontrados %d NFTS nodes", len(tp_nodes))
# Gera assinatura para cada NFTS
for idx, tp in enumerate(tp_nodes, start=1):
logger.info("Gerando string canônica para NFTS #%d...", idx)
s = build_tpNFTS_string(tp)
# DEBUG: Mostra SerieNFTS formatada (5 chars)
dbgpath = os.path.join(debug_dir, f"string_tpNFTS_CANONICAL_{idx}.txt")
with open(dbgpath, "w", encoding="utf-8") as df:
df.write(s)
df.write("\n\n=== DEBUG SerieNFTS ===\n")
serie_node = tp.xpath('.// *[local-name()="ChaveDocumento"]/*[local-name()="SerieNFTS"]')
if serie_node:
df.write(f"Original: '{serie_node[0].text}'\n")
df.write(f"Canonical: '{normalize_serie_nfts(serie_node[0].text)}' (len={len(normalize_serie_nfts(serie_node[0].text))})\n")
df.write("\n=== REPR COMPLETA ===\n")
df.write(repr(s))
logger.info("✅ String canônica (NFTS #%d) com SerieNFTS(5): %s", idx, dbgpath)
signature_bytes = sign_bytes_sha1_pkcs1(private_key, s.encode("utf-8"))
signature_b64 = base64.b64encode(signature_bytes).decode("ascii")
assin = find_child(tp, "Assinatura")
if assin is None:
assin = etree.Element("Assinatura")
tp.append(assin)
assin.text = signature_b64
logger.info("✅ Assinatura inserida no NFTS #%d.", idx)
# Assina o documento inteiro (opcional)
try:
logger.info("Assinando documento inteiro com xmlsec...")
sign_document_xmlsec(root, key_pem_path, cert_pem_path)
logger.info("✅ Assinatura XML adicionada.")
except Exception as e:
logger.warning("xmlsec falhou (opcional): %s", e)
signed_xml_bytes = etree.tostring(root, encoding="utf-8", xml_declaration=True, pretty_print=False)
signed_xml_str = signed_xml_bytes.decode("utf-8")
soap_bytes = build_soap_envelope(signed_xml_str)
with open(output_soap, "wb") as f:
f.write(soap_bytes)
logger.info("✅ SOAP TesteEnvioLoteNFTS salvo em: %s", output_soap)
if __name__ == "__main__":
main()