Acionar uma execução de pipeline com o Cloud Pub/Sub

Os exemplos de código a seguir mostram como gravar, implantar e acionar um pipeline usando um gatilho baseado em eventos no Cloud Functions com um gatilho do Cloud Pub/Sub.

Criar e compilar um pipeline simples

Com o SDK do Kubeflow Pipelines, crie um pipeline programado e compile-o em um arquivo YAML.

Amostra hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Fazer upload do YAML do pipeline compilado no bucket do Cloud Storage

  1. Abra o navegador do Cloud Storage no Console do Google Cloud.

    Navegador do Cloud Storage

  2. Clique no bucket do Cloud Storage criado quando você configurou o projeto.

  3. Usando uma pasta atual ou uma nova, faça upload do YAML do pipeline compilado (neste exemplo, hello_world_scheduled_pipeline.yaml) para a pasta selecionada.

  4. Clique no arquivo YAML enviado para acessar os detalhes. Copie o URI da gsutil para uso posterior.

Criar um Cloud Function com gatilho do Pub/Sub

  1. Acesse a página Cloud Functions no console.

    Acessar a página do Cloud Functions

  2. Clique no botão Criar função.

  3. Na seção Noções básicas, atribua um nome à função (por exemplo, my-scheduled-pipeline-function).

  4. Na seção Gatilho, selecione Cloud Pub/Sub como o tipo de gatilho.

    criar configuração da função escolha pubsub como imagem do tipo gatilho

  5. Na lista suspensa Selecionar um tópico do Cloud Pub/Sub, clique em Criar um tópico.

  6. Na caixa Criar um tópico, dê um nome ao novo tópico (por exemplo, my-scheduled-pipeline-topic) e selecione Criar tópico.

  7. Deixe todos os outros campos como padrão e clique em Salvar para salvar a configuração da seção gatilho.

  8. Deixe todos os outros campos como padrão e clique em Next para acessar a seção "Code".

  9. Em Ambiente de execução, selecione Python 3.7.

  10. Em Entry, insira "subscribe" (exemplo de nome da função do ponto de entrada do código).

  11. Em Código-fonte, selecione Editor in-line, se essa opção ainda não estiver selecionada.

  12. No arquivo main.py, adicione o seguinte código:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    Substitua:

    • PROJECT_ID: o projeto do Google Cloud em que este pipeline é executado.
    • REGION: a região em que este pipeline é executado.
    • PIPELINE_ROOT: especifique um URI do Cloud Storage que sua conta de serviço de pipelines possa acessar. Os artefatos das execuções de pipeline são armazenados na raiz do pipeline.
  13. No arquivo requirements.txt, substitua o conteúdo pelos seguintes requisitos de pacote:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. Clique em implantar para implantar a função.