Criar um pipeline para treinamento contínuo de modelos

Neste documento, apresentamos as etapas necessárias para criar um pipeline que treina automaticamente um modelo personalizado sempre que novos dados são inseridos no conjunto de dados usando o Vertex AI Pipelines e o Cloud Functions.

Objetivos

As etapas a seguir abrangem esse processo:

  1. adquirir e preparar um conjunto de dados no BigQuery

  2. Criar e fazer upload de um pacote de treinamento personalizado Quando executada, ela lê dados do conjunto e treina o modelo.

  3. Criar um pipeline da Vertex AI. Esse pipeline executa o pacote de treinamento personalizado, faz upload do modelo para o Vertex AI Model Registry, executa o job de avaliação e envia uma notificação por e-mail.

  4. Execute o pipeline manualmente.

  5. Criar uma função do Cloud com um gatilho do Eventarc que executa o pipeline sempre que novos dados são inseridos no conjunto de dados do BigQuery.

Antes de começar

Configure o projeto e o notebook.

Configurar o projeto

  1. No console do Google Cloud, acesse a página do seletor de projetos.

    Acessar o seletor de projetos

  2. Selecione ou crie um projeto do Google Cloud.

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

Criar notebook

Usamos um bloco do Colab Enterprise para executar uma parte do código deste tutorial.

  1. Se você não for o proprietário do projeto, peça a um proprietário que conceda a você os papéis roles/resourcemanager.projectIamAdmin e roles/aiplatform.colabEnterpriseUser do IAM.

    Você precisa ter esses papéis para usar o Colab Enterprise e conceder papéis e permissões do IAM a você e a contas de serviço.

    Acessar IAM

  2. No console do Google Cloud, acesse a página Notebooks do Colab Enterprise.

    O Colab Enterprise vai solicitar que você ative as APIs necessárias a seguir, caso ainda não estejam ativadas.

    • API Vertex AI
    • API Dataform
    • API Compute Engine

    Acessar o Colab Enterprise

  3. No menu Região, selecione a região em que você quer criar o notebook. Se você não tiver certeza, use us-central1 como região.

    Use a mesma região para todos os recursos neste tutorial.

  4. Clique em Criar um novo notebook.

O novo notebook aparece na guia Meus notebooks. Para executar o código no notebook, adicione uma célula de código e clique no botão  Executar célula.

Configurar o ambiente de desenvolvimento

  1. No seu notebook, instale os pacotes Python3 a seguir.

    ! pip3 install  google-cloud-aiplatform==1.34.0 \
                    google-cloud-pipeline-components==2.6.0 \
                    kfp==2.4.0 \
                    scikit-learn==1.0.2 \
                    mlflow==2.10.0
    
  2. Execute o comando a seguir para definir o projeto da CLI do Google Cloud:

    PROJECT_ID = "PROJECT_ID"
    
    # Set the project id
    ! gcloud config set project {PROJECT_ID}
    

    Substitua PROJECT_ID pela ID do seu projeto. Se necessário, localize o ID do projeto no console do Google Cloud.

  3. Atribua os papéis à sua Conta do Google:

    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/bigquery.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.user
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/storage.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/pubsub.editor
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/cloudfunctions.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.viewer
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/logging.configWriter
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/iam.serviceAccountUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/eventarc.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/aiplatform.colabEnterpriseUser
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/artifactregistry.admin
    ! gcloud projects add-iam-policy-binding PROJECT_ID --member="user:"EMAIL_ADDRESS"" --role=roles/serviceusage.serviceUsageAdmin
    
  4. Ative as APIs a seguir

    • API Artifact Registry
    • API BigQuery
    • API Cloud Build
    • API Cloud Functions
    • API Cloud Logging
    • API Pub/Sub
    • API Cloud Run Admin
    • API Cloud Storage
    • API Eventarc
    • API Service Usage
    • API Vertex AI
    ! gcloud services enable artifactregistry.googleapis.com bigquery.googleapis.com cloudbuild.googleapis.com cloudfunctions.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com storage-component.googleapis.com  eventarc.googleapis.com serviceusage.googleapis.com aiplatform.googleapis.com
    

  5. Conceda papéis às contas de serviço do seu projeto:

    1. Ver os nomes das suas contas de serviço

      ! gcloud iam service-accounts list
      

      Anote o nome do seu agente de serviço do Compute. Ele precisa estar no formato [email protected].

    2. Conceda os papéis necessários à conta de serviço.

      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID[email protected]"" --role=roles/aiplatform.serviceAgent
      ! gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:"SA_ID[email protected]"" --role=roles/eventarc.eventReceiver
      

Adquirir e preparar conjunto de dados

Neste tutorial, você criará um modelo que prevê a tarifa de uma corrida de táxi com base em recursos como tempo de viagem, local e distância. Usaremos o conjunto de dados público Chicago Taxi Trips. Esse conjunto de dados inclui corridas de táxi de 2013 até o presente, reportadas à agência reguladora da cidade de Chicago. Para proteger a privacidade dos motoristas e usuários da táxi ao mesmo tempo e permitir que o agregador analise os dados, o ID do táxi é mantido consistente para qualquer número de medalhas, mas não mostra o{101 }, os Census Tracts são suprimidos em alguns casos e os tempos são arredondados para os 15 minutos mais próximos.

Para mais informações, confira Viagens de táxi de Chicago no Marketplace.

Criar um conjunto de dados do BigQuery

  1. No console do Google Cloud, acesse o BigQuery Studio.

    Acessar o BigQuery

  2. No painel Explorer, localize seu projeto, clique em Ações e, em seguida, clique em Criar conjunto de dados.

  3. Na página Criar conjunto de dados:

    • Para o código do conjunto de dados, insira mlops. Para mais informações, consulte Nomenclatura do conjunto de dados.

    • Em Tipo de local, escolha sua multirregião. Por exemplo, escolha US (várias regiões nos Estados Unidos) se estiver usando us-central1. Após a criação de um conjunto de dados, o local não pode ser alterado.

    • Clique em Criar conjunto de dados.

Para saber mais, confira como criar conjuntos de dados.

Criar e preencher a tabela do BigQuery

Nesta seção, você cria a tabela e importa os dados de um ano do conjunto de dados público para o conjunto de dados do projeto.

  1. Acessar o BigQuery Studio

    Acessar o BigQuery

  2. Clique em Criar consulta SQL e execute a consulta SQL a seguir clicando em Executar.

    CREATE OR REPLACE TABLE `PROJECT_ID.mlops.chicago`
    AS (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Essa consulta cria a tabela <PROJECT_ID>.mlops.chicago e a preenche com dados da tabela pública bigquery-public-data.chicago_taxi_trips.taxi_trips.

  3. Para exibir o esquema da tabela, clique em Acessar a tabela e depois na guia Esquema.

  4. Para acessar o conteúdo da tabela, clique na guia Visualização.

Criar e fazer upload do pacote de treinamento personalizado

Nesta seção, você cria um pacote do Python que contém o código que lê o conjunto de dados, divide os dados em conjuntos de treinamento e teste e treina seu modelo personalizado. O pacote será executado como uma das tarefas do pipeline. Para mais informações, consulte Como criar um aplicativo de treinamento em Python para um contêiner pré-criado.

Criar o pacote de treinamento personalizado

  1. No bloco do Colab, crie pastas mãe para o aplicativo de treinamento:

    !mkdir -p training_package/trainer
    
  2. Crie um arquivo __init__.py em cada pasta para transformá-lo em um pacote usando o seguinte comando:

    ! touch training_package/__init__.py
    ! touch training_package/trainer/__init__.py
    

    Confira os novos arquivos e pastas no painel Arquivos pasta.

  3. No painel Arquivos, crie um arquivo chamado task.py na pasta training_package/trainer com o conteúdo a seguir.

    # Import the libraries
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    from google.cloud import bigquery, bigquery_storage
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from google import auth
    from scipy import stats
    import numpy as np
    import argparse
    import joblib
    import pickle
    import csv
    import os
    
    # add parser arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
    parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                        type=str, help='Dir to save the data and the trained model.')
    parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
    args = parser.parse_args()
    
    # data preparation code
    BQ_QUERY = """
    with tmp_table as (
    SELECT trip_seconds, trip_miles, fare,
        tolls,  company,
        pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
        DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
        DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
        CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
    FROM `{}`
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      pickup_latitude IS NOT NULL and
      pickup_longitude IS NOT NULL and
      fare > 0 and
      trip_miles > 0
      and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
    ORDER BY RAND()
    LIMIT 10000)
    SELECT *,
        EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
        EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
        EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
        EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
        FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
    FROM tmp_table
    """.format(args.bq_source)
    # Get default credentials
    credentials, project = auth.default()
    bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
    df = (
        bqclient.query(BQ_QUERY)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )
    # Add 'N/A' for missing 'Company'
    df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
    # Drop rows containing null data.
    df.dropna(how='any', axis='rows', inplace=True)
    # Pickup and dropoff locations distance
    df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100
    
    # Remove extremes, outliers
    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
    df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
    # Reduce location accuracy
    df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})
    
    # Drop the timestamp col
    X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)
    
    # Split the data into train and test
    X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)
    
    ## Format the data for batch predictions
    # select string cols
    string_cols = X_test.select_dtypes(include='object').columns
    # Add quotes around string fields
    X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
    # Add quotes around column names
    X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
    # Save DataFrame to csv
    X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    # Save test data without the target for batch predictions
    X_test.drop('\"fare\"',axis=1,inplace=True)
    X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
    
    # Separate the target column
    y_train=X_train.pop('fare')
    # Get the column indexes
    col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
    # Create a column transformer pipeline
    ct_pipe = ColumnTransformer(transformers=[
        ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
        ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
        ('std_scaler', StandardScaler(), [
            col_index_dict['trip_start_year'],
            col_index_dict['abs_distance'],
            col_index_dict['pickup_longitude'],
            col_index_dict['pickup_latitude'],
            col_index_dict['dropoff_longitude'],
            col_index_dict['dropoff_latitude'],
            col_index_dict['trip_miles'],
            col_index_dict['trip_seconds']])
    ])
    # Add the random-forest estimator to the pipeline
    rfr_pipe = Pipeline([
        ('ct', ct_pipe),
        ('forest_reg', RandomForestRegressor(
            n_estimators = 20,
            max_features = 1.0,
            n_jobs = -1,
            random_state = 3,
            max_depth=None,
            max_leaf_nodes=None,
        ))
    ])
    
    # train the model
    rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
    rfr_rmse = np.sqrt(-rfr_score)
    print ("Crossvalidation RMSE:",rfr_rmse.mean())
    final_model=rfr_pipe.fit(X_train, y_train)
    # Save the model pipeline
    with open(os.path.join(args.training_dir,"model.pkl"), 'wb') as model_file:
        pickle.dump(final_model, model_file)
    

    O código realiza as seguintes tarefas:

    1. Seleção de atributos.
    2. Transformação do horário dos dados de embarque e desembarque de UTC para o horário local de Chicago.
    3. Extração da data, hora, dia da semana, mês e ano da data e hora de retirada.
    4. Calcular a duração da viagem usando os horários de início e término.
    5. Identificar e marcar viagens que começaram ou terminaram em um aeroporto com base nas áreas da comunidade.
    6. O modelo de regressão de floresta aleatória é treinado para prever a tarifa da viagem de táxi usando o framework scikit-learn.
    7. O modelo treinado é salvo em um arquivo pickle model.pkl.

      A abordagem e a engenharia de atributos selecionadas são baseadas na exploração e análise de dados em Como prever tarifas de táxi de Chicago.

  4. No painel Arquivos, crie um arquivo chamado setup.py na pasta training_package com o conteúdo a seguir.

    from setuptools import find_packages
    from setuptools import setup
    
    REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
    setup(
        name='trainer',
        version='0.1',
        install_requires=REQUIRED_PACKAGES,
        packages=find_packages(),
        include_package_data=True,
        description='Training application package for chicago taxi trip fare prediction.'
    )
    
  5. No notebook, execute setup.py para criar a distribuição de origem do aplicativo de treinamento:

    ! cd training_package && python setup.py sdist --formats=gztar && cd ..
    

No final desta seção, o painel Arquivos vai conter os seguintes arquivos e pastas em training-package.

dist
  trainer-0.1.tar.gz
trainer
  __init__.py
  task.py
trainer.egg-info
__init__.py
setup.py

Faça o upload do pacote de treinamento personalizado para o Cloud Storage

  1. Criar um bucket do Cloud Storage.

    REGION="REGION"
    BUCKET_NAME = "BUCKET_NAME"
    BUCKET_URI = f"gs://{BUCKET_NAME}"
    
    ! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI
    

    Substitua os seguintes valores de parâmetro:

    • REGION: escolha a mesma região ao criar o notebook do Colab.

    • BUCKET_NAME: O nome do bloco.

  2. Faça o upload do pacote de treinamento no bucket do Cloud Storage.

    # Copy the training package to the bucket
    ! gsutil cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/
    

Crie o pipeline

Um pipeline é uma descrição de um fluxo de trabalho de MLOps como um gráfico de etapas chamadas tarefas de pipeline.

Nesta seção, você define as tarefas do pipeline, compila-as em YAML e registra seu pipeline no Artifact Registry para que ele possa ser controlado por versões e executado várias vezes, por um único usuário ou por vários usuários.

Veja a seguir uma visualização das tarefas em nosso pipeline, incluindo treinamento de modelo, upload de modelo, avaliação de modelo e notificação por e-mail:

Visualização do pipeline

Para mais informações, consulte Como criar modelos de pipeline.

Definir constantes e inicializar clientes

  1. No notebook, defina as constantes que serão usadas nas etapas posteriores:

    import os
    
    EMAIL_RECIPIENTS = [ "NOTIFY_EMAIL" ]
    PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
    PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
    WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
    os.environ['AIP_MODEL_DIR'] = WORKING_DIR
    EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
    PIPELINE_FILE = PIPELINE_NAME + ".yaml"
    

    Substitua NOTIFY_EMAIL por um endereço de e-mail. Quando o job do pipeline é concluído, com ou sem sucesso, um e-mail é enviado para esse endereço.

  2. Inicialize o SDK da Vertex AI com o projeto, o bucket de preparo, o local e o experimento.

    from google.cloud import aiplatform
    
    aiplatform.init(
        project=PROJECT_ID,
        staging_bucket=BUCKET_URI,
        location=REGION,
        experiment=EXPERIMENT_NAME)
    
    aiplatform.autolog()
    

Definir as tarefas do pipeline

No notebook, defina o pipeline custom_model_training_evaluation_pipeline:

from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source
                            )
    return

O pipeline consiste em um gráfico de tarefas que usam os seguintes componentes de pipeline do Google Cloud:

Compile o pipeline.

Compile o pipeline usando o compilador do Kubeflow Pipelines (KFP) em um arquivo YAML contendo uma representação hermética do pipeline.

from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

Você verá um arquivo YAML chamado vertex-pipeline-datatrigger-tutorial.yaml no diretório de trabalho.

Fazer upload do pipeline como um modelo

  1. Crie um repositório do tipo KFP no Artifact Registry.

    REPO_NAME = "mlops"
    # Create a repo in the artifact registry
    ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP
    
  2. Faça upload do pipeline compilado para o repositório.

    from kfp.registry import RegistryClient
    
    host = f"http://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
    client = RegistryClient(host=host)
    TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=["v1", "latest"],
    extra_headers={"description":"This is an example pipeline template."})
    TEMPLATE_URI = f"http://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
    
  3. No console do Google Cloud, verifique se o modelo aparece em Modelos de pipeline.

    Acessar "Modelos de pipelines"

Executar o pipeline manualmente

Para garantir que o pipeline funcione, execute-o manualmente.

  1. No notebook, especifique os parâmetros necessários para executar o pipeline como um job.

    DATASET_NAME = "mlops"
    TABLE_NAME = "chicago"
    
    worker_pool_specs = [{
                            "machine_spec": {"machine_type": "e2-highmem-2"},
                            "replica_count": 1,
                            "python_package_spec":{
                                    "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                    "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                    "python_module": "trainer.task",
                                    "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                            },
    }]
    
    parameters = {
        "project": PROJECT_ID,
        "location": REGION,
        "training_job_display_name": "taxifare-prediction-training-job",
        "worker_pool_specs": worker_pool_specs,
        "base_output_dir": BUCKET_URI,
        "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        "model_display_name": "taxifare-prediction-model",
        "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
        "target_field_name": "fare",
        "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
        "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
        "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
        "existing_model": False
    }
    
  2. Criar e executar um job de pipeline.

    # Create a pipeline job
    job = aiplatform.PipelineJob(
        display_name="triggered_custom_regression_evaluation",
        template_path=TEMPLATE_URI ,
        parameter_values=parameters,
        pipeline_root=BUCKET_URI,
        enable_caching=False
    )
    # Run the pipeline job
    job.run()
    

    O job leva cerca de 30 minutos para ser concluído.

  3. No console, um novo pipeline deve ser executado na página Pipelines:

    Acessar "Execuções do pipeline"

  4. Depois que a execução do pipeline for concluída, será exibido um novo modelo chamado taxifare-prediction-model ou uma nova versão de modelo no Vertex AI Model Registry:

    Acessar o Model Registry

  5. Também deve aparecer um novo job de previsão em lote:

    Acessar "Previsões em lote"

Criar uma função que acione o pipeline

Nesta etapa, você cria uma função do Cloud (2a geração) que executa o pipeline sempre que novos dados são inseridos na tabela do BigQuery.

Especificamente, usamos um Eventarc para acionar a função sempre que ocorre um evento google.cloud.bigquery.v2.JobService.InsertJob. Em seguida, a função executa o modelo de pipeline.

Para mais informações, consulte Acionadores do Eventarc e Tipos de evento compatíveis.

Criar função com o gatilho do Eventarc

  1. No console do Google Cloud, acesse o Cloud Functions.

    Acesse o Cloud Functions

  2. Clique no botão Criar função. Na página Configuração:

    1. Selecione 2a geração como seu ambiente.

    2. Em Nome da função, use mlops.

    3. Em Região, selecione a mesma região do bucket do Cloud Storage e do repositório do Artifact Registry.

    4. Em Acionador, selecione Outro acionador. O painel Gatilho do Eventarc é aberto.

      1. Em Tipo de acionador, escolha Fontes do Google.

      2. Em Provedor de eventos, escolha BigQuery.

      3. Em Tipo de evento, escolha google.cloud.bigquery.v2.JobService.InsertJob.

      4. Em Recurso, escolha Recurso específico e especifique a tabela do BigQuery

        projects/PROJECT_ID/datasets/mlops/tables/chicago
        
      5. No campo Região, selecione um local para o acionador do Eventarc, se aplicável. Consulte Local do acionador para mais informações.

      6. Clique em Salvar acionador.

    5. Se for solicitado que você conceda papéis a contas de serviço, clique em Conceder todos.

  3. Clique em Avançar para acessar a página Código. Na página Código:

    1. Defina o Ambiente de execução como python 3.12.

    2. Defina o Ponto de entrada como mlops_entrypoint.

    3. Com o editor in-line, abra o arquivo main.py e substitua o conteúdo pelo seguinte:

      Substitua PROJECT_ID,REGION,BUCKET_NAME pelos valores usados anteriormente.

      import json
      import functions_framework
      import requests
      import google.auth
      import google.auth.transport.requests
      # CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
      # Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
      @functions_framework.cloud_event
      def mlops_entrypoint(cloudevent):
          # Print out the CloudEvent's (required) `type` property
          # See http://github.com/cloudevents/spec/blob/v1.0.1/spec.md#type
          print(f"Event type: {cloudevent['type']}")
      
          # Print out the CloudEvent's (optional) `subject` property
          # See http://github.com/cloudevents/spec/blob/v1.0.1/spec.md#subject
          if 'subject' in cloudevent:
              # CloudEvent objects don't support `get` operations.
              # Use the `in` operator to verify `subject` is present.
              print(f"Subject: {cloudevent['subject']}")
      
          # Print out details from the `protoPayload`
          # This field encapsulates a Cloud Audit Logging entry
          # See http://cloud.go888ogle.com.fqhub.com/logging/docs/audit#audit_log_entry_structure
      
          payload = cloudevent.data.get("protoPayload")
          if payload:
              print(f"API method: {payload.get('methodName')}")
              print(f"Resource name: {payload.get('resourceName')}")
              print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
              row_count = payload.get('metadata', dict()).get('tableDataChange',dict()).get('insertedRowsCount')
              print(f"No. of rows: {row_count} !!")
              if row_count:
                  if int(row_count) > 0:
                      print ("Pipeline trigger Condition met !!")
                      submit_pipeline_job()
              else:
                  print ("No pipeline triggered !!!")
      
      def submit_pipeline_job():
          PROJECT_ID = 'PROJECT_ID'
          REGION = 'REGION'
          BUCKET_NAME = "BUCKET_NAME"
          DATASET_NAME = "mlops"
          TABLE_NAME = "chicago"
      
          base_output_dir = BUCKET_NAME
          BUCKET_URI = "gs://{}".format(BUCKET_NAME)
          PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
          PIPELINE_NAME = "vertex-mlops-pipeline-tutorial"
          EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
          REPO_NAME ="mlops"
          TEMPLATE_NAME="custom-model-training-evaluation-pipeline"
          TRAINING_JOB_DISPLAY_NAME="taxifare-prediction-training-job"
          worker_pool_specs = [{
                              "machine_spec": {"machine_type": "e2-highmem-2"},
                              "replica_count": 1,
                              "python_package_spec":{
                                      "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                      "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                      "python_module": "trainer.task",
                                      "args":["--project-id",PROJECT_ID,"--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                              },
          }]
      
          parameters = {
              "project": PROJECT_ID,
              "location": REGION,
              "training_job_display_name": "taxifare-prediction-training-job",
              "worker_pool_specs": worker_pool_specs,
              "base_output_dir": BUCKET_URI,
              "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
              "model_display_name": "taxifare-prediction-model",
              "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
              "target_field_name": "fare",
              "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
              "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
              "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
              "existing_model": False
          }
          TEMPLATE_URI = f"http://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"
          print("TEMPLATE URI: ", TEMPLATE_URI)
          request_body = {
              "name": PIPELINE_NAME,
              "displayName": PIPELINE_NAME,
              "runtimeConfig":{
                  "gcsOutputDirectory": PIPELINE_ROOT,
                  "parameterValues": parameters,
              },
              "templateUri": TEMPLATE_URI
          }
          pipeline_url = "http://us-central1-aiplatform.googleapis.com/v1/projects/{}/locations/{}/pipelineJobs".format(PROJECT_ID, REGION)
          creds, project = google.auth.default()
          auth_req = google.auth.transport.requests.Request()
          creds.refresh(auth_req)
          headers = {
          'Authorization': 'Bearer {}'.format(creds.token),
          'Content-Type': 'application/json; charset=utf-8'
          }
          response = requests.request("POST", pipeline_url, headers=headers, data=json.dumps(request_body))
          print(response.text)
      
    4. Abra o arquivo requirements.txt e substitua o conteúdo pelo seguinte:

      requests==2.31.0
      google-auth==2.25.1
      
  4. Clique em implantar para implantar a função.

Inserir dados para acionar o pipeline

  1. No console do Google Cloud, acesse o BigQuery Studio.

    Acessar o BigQuery

  2. Clique em Criar consulta SQL e execute a consulta SQL a seguir clicando em Executar.

    INSERT INTO `PROJECT_ID.mlops.chicago`
    (
        WITH
          taxitrips AS (
          SELECT
            trip_start_timestamp,
            trip_end_timestamp,
            trip_seconds,
            trip_miles,
            payment_type,
            pickup_longitude,
            pickup_latitude,
            dropoff_longitude,
            dropoff_latitude,
            tips,
            tolls,
            fare,
            pickup_community_area,
            dropoff_community_area,
            company,
            unique_key
          FROM
            `bigquery-public-data.chicago_taxi_trips.taxi_trips`
          WHERE pickup_longitude IS NOT NULL
          AND pickup_latitude IS NOT NULL
          AND dropoff_longitude IS NOT NULL
          AND dropoff_latitude IS NOT NULL
          AND trip_miles > 0
          AND trip_seconds > 0
          AND fare > 0
          AND EXTRACT(YEAR FROM trip_start_timestamp) = 2022
        )
    
        SELECT
          trip_start_timestamp,
          EXTRACT(MONTH from trip_start_timestamp) as trip_month,
          EXTRACT(DAY from trip_start_timestamp) as trip_day,
          EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
          EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
          trip_seconds,
          trip_miles,
          payment_type,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
          ) AS pickup_grid,
          ST_AsText(
              ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
          ) AS dropoff_grid,
          ST_Distance(
              ST_GeogPoint(pickup_longitude, pickup_latitude),
              ST_GeogPoint(dropoff_longitude, dropoff_latitude)
          ) AS euclidean,
          CONCAT(
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
                  pickup_latitude), 0.1)),
              ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
                  dropoff_latitude), 0.1))
          ) AS loc_cross,
          IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
          tips,
          tolls,
          fare,
          pickup_longitude,
          pickup_latitude,
          dropoff_longitude,
          dropoff_latitude,
          pickup_community_area,
          dropoff_community_area,
          company,
          unique_key,
          trip_end_timestamp
        FROM
          taxitrips
        LIMIT 1000000
    )
    

    Esta consulta SQL para inserir novas linhas na tabela.

  3. Para verificar se o evento foi acionado, pesquise pipeline trigger condition met no registro da função.

    Acesse o Cloud Functions

  4. Se a função for acionada corretamente, um novo pipeline será executado no Vertex AI Pipelines. O job do pipeline leva cerca de 30 minutos para ser concluído.

    Acessar o Vertex AI Pipelines

Limpar

Para limpar todos os recursos do Google Cloud usados neste projeto, exclua o projeto do Google Cloud usado no tutorial.

Caso contrário, exclua os recursos individuais criados para este tutorial.

  1. Exclua o notebook do Colab.

    Acessar o Colab Enterprise

  2. Excluir conjunto de dados no BigQuery.

    Acessar o BigQuery

  3. Exclua o Cloud Storage.

    Acesse o Cloud Storage

  4. Excluir recursos da Vertex AI.

  5. Exclua um repositório do Artifact Registry.

    Acessar o Artifact Registry

  6. Excluir função do Cloud

    Acesse o Cloud Functions