ERRO Assinatura Digital da NFTS incorreta Prefeitura Sao Paulo

12 views
Skip to first unread message

Rodrigo Bueno

unread,
Dec 9, 2025, 7:43:20 AM (13 days ago) Dec 9
to PyNFe
Bom dia.

Sou novo em Phyton então se eu tiver falando besteira desconsiderem.

Estou fazendo assinatura nfts e nFSe prefeitura SAO PAulo devido a reforma tributaria. 

A NFSE foi de boas mas a NFTS nao vai de jeito nenhum.

<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
   <soap:Body>
      <TesteEnvioLoteNFTSResponse xmlns="http://www.prefeitura.sp.gov.br/nfts">
         <RetornoXML><![CDATA[<?xml version="1.0" encoding="UTF-8"?><RetornoEnvioLoteNFTS xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.prefeitura.sp.gov.br/nfts"><Cabecalho Versao="1" xmlns=""><Sucesso>false</Sucesso><InformacoesLoteNFTS><NumeroLote>0</NumeroLote><Remetente><CPFCNPJ><CNPJ>43643139000166</CNPJ></CPFCNPJ></Remetente><dtEnvioLote>2025-12-09T09:27:56.6963594-03:00</dtEnvioLote><QtdeNFTSProcessadas>0</QtdeNFTSProcessadas><TempoProcessamento>0</TempoProcessamento><ValorTotalServicos>0</ValorTotalServicos></InformacoesLoteNFTS></Cabecalho><ListaRetornoLote xmlns=""><Erro><Codigo>1009</Codigo><Descricao>Assinatura Digital da NFTS incorreta</Descricao><IdentificacaoDocumento><Posicao>1</Posicao><ChaveDocumento><InscricaoMunicipal>10259627</InscricaoMunicipal><SerieNFTS>1</SerieNFTS><NumeroDocumento>12345</NumeroDocumento></ChaveDocumento></IdentificacaoDocumento></Erro></ListaRetornoLote></RetornoEnvioLoteNFTS>]]></RetornoXML>
      </TesteEnvioLoteNFTSResponse>
   </soap:Body>
</soap:Envelope>


 ALguem consegue me ajudar com isso? Vou colocar aqui o codigo .py que desenvolvi.. 


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sign NFTS (tpNFTS) strings and produce TesteEnvioLoteNFTS SOAP envelope.

✅ CORREÇÃO FINAL ERRO 1009: SerieNFTS com TAMANHO FIXO 5 (preenchido com espaços à direita)
tpSerieNFTS: Tipo série de documento NFTS. C 1-5 caracteres
"""

import sys
import os
import tempfile
import logging
import base64
import copy
from collections import OrderedDict

from lxml import etree
from cryptography.hazmat.primitives.serialization import pkcs12, Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
import xmlsec

# Configuração de Log
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger("sign_nfts")

# Namespaces
NS_SP = "http://www.prefeitura.sp.gov.br/nfts"
NS_SOAP = "http://schemas.xmlsoap.org/soap/envelope/"

# ---------------- Funções Auxiliares ----------------

def find_child(parent, tagname):
    """Encontra um filho por nome local, ignorando o namespace."""
    nodes = parent.xpath('./*[local-name() = $name]', name=tagname)
    return nodes[0] if nodes else None

def read_pkcs12(pfx_path, password):
    """Lê a chave privada e o certificado do arquivo PFX."""
    with open(pfx_path, "rb") as f:
        data = f.read()
    private_key, cert, additional = pkcs12.load_key_and_certificates(data, password.encode("utf-8"))
    if private_key is None or cert is None:
        raise RuntimeError("Não foi possível extrair chave privada / certificado do PFX")
    return private_key, cert

def ensure_path_for_debug():
    """Cria um diretório temporário para salvar arquivos de debug."""
    if os.name == 'nt':
        dirpath = r'c:\temp\nfts_debug'
    else:
        dirpath = '/tmp/nfts_debug'
    try:
        os.makedirs(dirpath, exist_ok=True)
    except Exception:
        dirpath = tempfile.gettempdir()
    return dirpath

def normalize_numeric_string(text):
    """
    Remove padding (zeros à esquerda) de *qualquer* string numérica.
    Ex: '001' -> '1', '43643139000166' -> '43643139000166'
    """
    if text is None:
        return ""
   
    clean_text = text.replace('\xa0', ' ').strip()
   
    if clean_text.isdigit():
        try:
            return str(int(clean_text))
        except ValueError:
            pass
    return clean_text

# ✅ CORREÇÃO MANUAL: SerieNFTS tamanho FIXO 5 (preenchido com espaços à direita)
def normalize_serie_nfts(text):
    """
    tpSerieNFTS: Tipo série de documento NFTS. C 1-5 caracteres
    Preenche com espaços à direita até 5 caracteres na STRING CANÔNICA
    """
    if text is None:
        return ""
   
    clean_text = text.replace('\xa0', ' ').strip()
    # 🎯 PREENCHE ATÉ 5 CARACTERES COM ESPAÇOS À DIREITA
    padded = (clean_text + "     ")[:5]  # 5 espaços, corta em 5 chars
    return padded

def normalize_float_value(text):
    """Garante que valores float tenham EXATAMENTE duas casas decimais (ex: 1500.00)."""
    if text is None: return ""
    clean_text = text.replace('\xa0', ' ').replace(',', '.').strip()
    try:
        f_val = float(clean_text)
        return "{:.2f}".format(f_val)
    except ValueError:
        return clean_text

def normalize_boolean_value(text):
    """Força o valor booleano para 'true' ou 'false' (minúsculo)."""
    if text is None: return "false"
    clean_text = text.replace('\xa0', ' ').strip().lower()
    if clean_text in ("true", "1", "s", "sim", "t", "y", "yes"):
        return "true"
    return "false"

# ---------------- Canonicalização para tpNFTS ----------------

def build_tpNFTS_string(tpnode):
    """
    Constrói o fragmento XML canônico para assinatura, conforme as regras da Prefeitura.
    ✅ SerieNFTS: tamanho 5 com espaços à direita
    """
    clean_tp = copy.deepcopy(tpnode)
   
    assin = find_child(clean_tp, "Assinatura")
    if assin is not None:
        clean_tp.remove(assin)

    canonical_order_map = OrderedDict([
        ("TipoDocumento", "num_str"),
        ("ChaveDocumento", {
            "InscricaoMunicipal": "str",
            "SerieNFTS": "serie",  # ✅ TAMANHO 5 COM ESPAÇOS
            "NumeroDocumento": "num_str",
        }),
        ("DataPrestacao", "str"),
        ("StatusNFTS", "str"),
        ("TributacaoNFTS", "str"),
        ("ValorServicos", "float"),
        ("ValorDeducoes", "float"),
        ("CodigoServico", "num_str"),
        ("CodigoSubItem", "num_str"),
        ("AliquotaServicos", "float"),
        ("ISSRetidoTomador", "bool"),
        ("ISSRetidoIntermediario", "bool"),
        ("Prestador", {
            "CPFCNPJ": {
                "CNPJ": "num_str",
                "CPF": "num_str",
            },
            "InscricaoMunicipal": "str",
            "RazaoSocialPrestador": "str",
            "Endereco": {
                "TipoLogradouro": "str",
                "Logradouro": "str",
                "NumeroEndereco": "str",
                "ComplementoEndereco": "str_opt",
                "Bairro": "str",
                "Cidade": "num_str",
                "UF": "str",
                "CEP": "str",
            },
            "Email": "str_opt",
        }),
        ("RegimeTributacao", "num_str"),
        ("DataPagamento", "str_opt"),
        ("Discriminacao", "str"),
        ("TipoNFTS", "num_str"),
        ("Tomador", {
            "CPFCNPJ": {
                "CPF": "num_str",
                "CNPJ": "num_str",
            },
            "RazaoSocial": "str",
        }),
    ])

    def build_canonical_fragment(node, order_map):
        """Constrói o fragmento canônico recursivamente, mantendo a ordem."""
        canonical_elements = []
       
        for tag_name, definition in order_map.items():
            original_child = find_child(node, tag_name)
            if original_child is None:
                continue

            if isinstance(definition, str):
                text_value = original_child.text
                if text_value is None:
                    text_value = ""
               
                # ✅ NORMALIZAÇÃO ESPECIAL PARA SERIE (tamanho 5)
                if definition == "num_str":
                    final_text_value = normalize_numeric_string(text_value)
                elif definition == "float":
                    final_text_value = normalize_float_value(text_value)
                elif definition == "bool":
                    final_text_value = normalize_boolean_value(text_value)
                elif definition == "serie":
                    final_text_value = normalize_serie_nfts(text_value)  # 🎯 5 chars!
                else:
                    final_text_value = text_value.replace('\xa0', ' ').strip()

                if final_text_value == "":
                    continue

                new_child = etree.Element(tag_name)
                new_child.text = final_text_value
                canonical_elements.append(new_child)
           
            elif isinstance(definition, dict):
                nested_elements = build_canonical_fragment(original_child, definition)
                if nested_elements:
                    new_parent = etree.Element(tag_name)
                    new_parent.extend(nested_elements)
                    canonical_elements.append(new_parent)
       
        return canonical_elements

    canonical_root = etree.Element("tpNFTS")
    children_in_order = build_canonical_fragment(clean_tp, canonical_order_map)
    canonical_root.extend(children_in_order)
   
    signed_xml_bytes = etree.tostring(
        canonical_root,
        encoding="utf-8",
        xml_declaration=False,
        pretty_print=False
    )
   
    return signed_xml_bytes.decode("utf-8")

# ---------------- Funções de Assinatura e SOAP ----------------

def sign_bytes_sha1_pkcs1(private_key, data_bytes):
    """Assina os bytes usando SHA1 e RSA PKCS#1 v1.5 (conforme o manual)."""
    signature = private_key.sign(
        data_bytes,
        padding.PKCS1v15(),
        hashes.SHA1()
    )
    return signature

def sign_document_xmlsec(root, key_pem_path, cert_pem_path):
    """Assina o documento inteiro (PedidoEnvioLoteNFTS) usando xmldsig (opcional)."""
    signature_node = xmlsec.template.create(
        root,
        xmlsec.Transform.EXCL_C14N,
        xmlsec.Transform.RSA_SHA1,
        ns='ds'
    )
    root.append(signature_node)

    ref = xmlsec.template.add_reference(signature_node, xmlsec.Transform.SHA1, uri="")
    xmlsec.template.add_transform(ref, xmlsec.Transform.ENVELOPED)

    key_info = xmlsec.template.ensure_key_info(signature_node)
    xmlsec.template.add_x509_data(key_info)

    ctx = xmlsec.SignatureContext()
    key = xmlsec.Key.from_file(key_pem_path, xmlsec.KeyFormat.PEM)
    key.load_cert_from_file(cert_pem_path, xmlsec.KeyFormat.PEM)
    ctx.key = key
    ctx.sign(signature_node)

def build_soap_envelope(xml_string):
    """Constrói o envelope SOAP TesteEnvioLoteNFTS com o XML assinado em CDATA."""
    envelope = etree.Element("{%s}Envelope" % NS_SOAP, nsmap={'soap': NS_SOAP})
    body = etree.SubElement(envelope, "{%s}Body" % NS_SOAP)
    request = etree.SubElement(body, "{%s}TesteEnvioLoteNFTSRequest" % NS_SP)
    etree.SubElement(request, "{%s}VersaoSchema" % NS_SP).text = "2"
    mensagem = etree.SubElement(request, "{%s}MensagemXML" % NS_SP)
    mensagem.text = etree.CDATA(xml_string)
   
    return etree.tostring(envelope, encoding="utf-8", xml_declaration=True, pretty_print=True)

# ---------------- Main ----------------

def main():
    if len(sys.argv) != 5:
        print("Uso: python sign_nfts.py input_nfts.xml certificado.pfx senha output_soap.xml")
        sys.exit(1)

    input_xml = sys.argv[1]
    pfx_file = sys.argv[2]
    senha = sys.argv[3]
    output_soap = sys.argv[4]

    logger.info("Carregando XML de entrada: %s", input_xml)
    parser = etree.XMLParser(remove_blank_text=True)
    tree = etree.parse(input_xml, parser)
    root = tree.getroot()

    tmpdir = tempfile.gettempdir()
    debug_dir = ensure_path_for_debug()

    logger.info("Extraindo chave privada e certificado do PFX...")
    private_key, cert = read_pkcs12(pfx_file, senha)

    cert_pem_path = os.path.join(tmpdir, "tmp_cert_nfts.pem")
    key_pem_path = os.path.join(tmpdir, "tmp_key_nfts.pem")
    with open(cert_pem_path, "wb") as f:
        f.write(cert.public_bytes(Encoding.PEM))
    with open(key_pem_path, "wb") as f:
        f.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))

    tp_nodes = root.xpath('//*[local-name()="NFTS"]')
   
    if not tp_nodes:
        logger.error("Nenhum elemento <NFTS> encontrado no XML.")
        sys.exit(1)
    else:
        logger.info("Encontrados %d NFTS nodes", len(tp_nodes))

    # Gera assinatura para cada NFTS
    for idx, tp in enumerate(tp_nodes, start=1):
        logger.info("Gerando string canônica para NFTS #%d...", idx)
        s = build_tpNFTS_string(tp)
       
        # DEBUG: Mostra SerieNFTS formatada (5 chars)
        dbgpath = os.path.join(debug_dir, f"string_tpNFTS_CANONICAL_{idx}.txt")
        with open(dbgpath, "w", encoding="utf-8") as df:
            df.write(s)
            df.write("\n\n=== DEBUG SerieNFTS ===\n")
            serie_node = tp.xpath('.// *[local-name()="ChaveDocumento"]/*[local-name()="SerieNFTS"]')
            if serie_node:
                df.write(f"Original: '{serie_node[0].text}'\n")
                df.write(f"Canonical: '{normalize_serie_nfts(serie_node[0].text)}' (len={len(normalize_serie_nfts(serie_node[0].text))})\n")
            df.write("\n=== REPR COMPLETA ===\n")
            df.write(repr(s))
           
        logger.info("✅ String canônica (NFTS #%d) com SerieNFTS(5): %s", idx, dbgpath)

        signature_bytes = sign_bytes_sha1_pkcs1(private_key, s.encode("utf-8"))
        signature_b64 = base64.b64encode(signature_bytes).decode("ascii")

        assin = find_child(tp, "Assinatura")
        if assin is None:
            assin = etree.Element("Assinatura")
            tp.append(assin)
        assin.text = signature_b64
        logger.info("✅ Assinatura inserida no NFTS #%d.", idx)

    # Assina o documento inteiro (opcional)
    try:
        logger.info("Assinando documento inteiro com xmlsec...")
        sign_document_xmlsec(root, key_pem_path, cert_pem_path)
        logger.info("✅ Assinatura XML adicionada.")
    except Exception as e:
        logger.warning("xmlsec falhou (opcional): %s", e)

    signed_xml_bytes = etree.tostring(root, encoding="utf-8", xml_declaration=True, pretty_print=False)
    signed_xml_str = signed_xml_bytes.decode("utf-8")
    soap_bytes = build_soap_envelope(signed_xml_str)

    with open(output_soap, "wb") as f:
        f.write(soap_bytes)

    logger.info("✅ SOAP TesteEnvioLoteNFTS salvo em: %s", output_soap)


if __name__ == "__main__":
    main()

Att,

Rodrigo Bueno







Reply all
Reply to author
Forward
0 new messages