Importer des valeurs de caractéristiques par lot

L'importation par lots vous permet d'importer des valeurs de caractéristiques de manière groupée à partir d'une source de données valide. Dans une requête d'importation par lots, vous pouvez importer des valeurs pour un maximum de 100 caractéristiques pour un type d'entité. Notez que vous ne pouvez avoir qu'un seul job d'importation par lots en cours d'exécution par type d'entité pour éviter les conflits.

Dans une requête d'importation par lots, spécifiez l'emplacement de vos données sources et leur correspondance avec les caractéristiques de votre magasin. Comme chaque requête d'importation par lots concerne un type d'entité unique, vos données sources doivent également appartenir à un type d'entité unique.

Une fois l'importation terminée, les valeurs des caractéristiques sont disponibles pour les opérations de lecture ultérieures.

Performances des jobs d'importation

Vertex AI Feature Store (ancien) offre une importation à haut débit, mais avec une latence minimale pouvant atteindre quelques minutes. Chaque requête envoyée à Vertex AI Feature Store (ancien) démarre un job afin d'effectuer le travail demandé. Un job d'importation prend plusieurs minutes, même lorsque vous n'importez qu'un seul enregistrement.

Si vous souhaitez ajuster les performances d'un job, modifiez les deux variables suivantes :

  • Nombre de nœuds de diffusion en ligne du magasin de caractéristiques.
  • Nombre de nœuds de calcul utilisés pour le job d'importation. Les nœuds de calcul traitent et écrivent des données dans le magasin de caractéristiques.

Le nombre de nœuds de calcul recommandé correspond à un nœud de calcul par tranche de 10 nœuds de diffusion en ligne sur le magasin de caractéristiques. Vous pouvez augmenter la taille de ces tranches si la charge de diffusion en ligne est faible. Vous pouvez spécifier un maximum de 100 nœuds de calcul. Pour obtenir plus de conseils, consultez la section Surveiller et régler les ressources en conséquence pour optimiser l'importation par lots.

Si le cluster de diffusion en ligne est sous-provisionné, le job d'importation peut échouer. En cas d'échec, relancez la requête d'importation à un moment où la charge de diffusion en ligne est plus faible, ou augmentez le nombre de nœuds de votre magasin de caractéristiques avant de retenter la requête.

Si le magasin de caractéristiques ne dispose pas d'un magasin en ligne (zéro nœud de publication en ligne), le job d'importation n'écrit que dans le magasin hors connexion et les performances du job dépendent uniquement du nombre de nœuds de calcul d'importation.

Cohérence des données

Des incohérences peuvent être introduites si les données sources sont modifiées lors de l'importation. Assurez-vous que toutes les modifications de données sources sont terminées avant de démarrer un job d'importation. De plus, les valeurs de caractéristique en double peuvent entraîner la diffusion de valeurs différentes entre des requêtes en ligne et des requêtes par lot. Assurez-vous de disposer d'une valeur de caractéristique pour chaque paire d'ID d'entité et d'horodatage.

Si une opération d'importation échoue, le featurestore peut contenir des données partielles, ce qui peut également renvoyer des valeurs incohérentes entre les requêtes de diffusion en ligne et par lot. Pour éviter cette incohérence, réessayez la même requête d'importation et attendez que la requête se termine.

Valeurs Null et tableaux vides

Lors de l'importation, Vertex AI Feature Store (ancien) considère les valeurs scalaires Null ou les tableaux vides comme des valeurs vides. Cela inclut les valeurs vides dans une colonne CSV. Vertex AI Feature Store (ancien) n'accepte pas les valeurs Null non scalaires, telles qu'une valeur null dans un tableau.

Lors de la diffusion en ligne et de la diffusion par lot, Vertex AI Feature Store (ancien) renvoie la dernière valeur non nulle ou non vide de la caractéristique. Si une valeur historique de la caractéristique n'est pas disponible, Vertex AI Feature Store (ancien) renvoie null.

Valeurs NaN

Vertex AI Feature Store (ancien) accepte les valeurs NaN ("Not a Number", pas un nombre) dans Double et DoubleArray. Lors de l'importation, vous pouvez saisir NaN dans le fichier CSV d'entrée de diffusion pour représenter une valeur NaN. Lors de la diffusion en ligne et de la diffusion par lot, Vertex AI Feature Store (ancien) renvoie NaN pour les valeurs NaN.

Importation par lots

Importez des valeurs de manière groupée dans un magasin de caractéristiques pour une ou plusieurs caractéristiques d'un seul type d'entité.

UI Web

  1. Dans la section "Vertex AI" de Google Cloud Console, accédez à la page Caractéristiques.

    Accéder à la page "Caractéristiques"

  2. Sélectionnez une région dans la liste déroulante Région.
  3. Dans le tableau des caractéristiques, affichez la colonne Type d'entité et recherchez le type d'entité contenant les caractéristiques pour lesquelles vous souhaitez importer des valeurs.
  4. Cliquez sur le nom du type d'entité.
  5. Dans la barre d'action, cliquez sur Ingérer des valeurs.
  6. Dans le champ Source de données, sélectionnez l'une des options suivantes :
    • Fichier CSV Cloud Storage : sélectionnez cette option pour importer des données provenant de plusieurs fichiers CSV à partir de Cloud Storage. Spécifiez le chemin d'accès et le nom du fichier CSV. Pour spécifier des fichiers supplémentaires, cliquez sur Ajouter un autre fichier.
    • Fichier AVRO Cloud Storage : sélectionnez cette option pour importer les données d'un fichier AVRO à partir de Cloud Storage. Indiquez le chemin d'accès et le nom du fichier AVRO.
    • Table BigQuery : sélectionnez cette option pour importer les données d'une table ou d'une vue BigQuery. Parcourez et sélectionnez une table ou une vue à utiliser, qui se présente sous la forme suivante : PROJECT_ID.DATASET_ID.TABLE_ID
  7. Cliquez sur Continuer.
  8. Pour Mapper les colonnes aux caractéristiques, spécifiez les colonnes de vos données sources qui correspondent aux entités et aux caractéristiques de votre magasin de caractéristiques.
    1. Indiquez le nom de colonne dans vos données sources contenant les ID d'entité.
    2. Pour le code temporel, spécifiez une colonne de code temporel dans vos données sources ou spécifiez un code temporel unique associé à toutes les valeurs de caractéristiques que vous importez.
    3. Dans la liste des caractéristiques, saisissez le nom de la colonne de données source correspondant à chaque caractéristique. Par défaut, Vertex AI Feature Store (ancien) suppose que le nom de caractéristique et le nom de colonne correspondent.
  9. Cliquez sur Ingérer.

REST

Pour importer des valeurs de caractéristiques pour des caractéristiques existantes, envoyez une requête POST à l'aide de la méthode featurestores.entityTypes.importFeatureValues. Veuillez noter que si les noms des colonnes de données sources et les identifiants des caractéristiques de destination sont différents, il faut inclure le paramètre sourceField.

Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :

  • LOCATION_ID : région dans laquelle le featurestore est créé. Exemple :us-central1
  • PROJECT_ID : l'ID de votre projet.
  • FEATURESTORE_ID : ID du featurestore.
  • ENTITY_TYPE_ID: ID du type d'entité.
  • ENTITY_SOURCE_COLUMN_ID: ID de la colonne source contenant les ID d'entité.
  • FEATURE_TIME_ID: ID de la colonne source contenant les horodatages de caractéristique pour les valeurs de caractéristique.
  • FEATURE_ID: ID d'une caractéristique existante du featurestore pour l'importation des valeurs.
  • FEATURE_SOURCE_COLUMN_ID: ID de la colonne source contenant des valeurs de caractéristiques pour les entités.
  • SOURCE_DATA_DETAILS : emplacement des données sources, indiquant également le format, tel que "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } pour une table ou une vue BigQuery.
  • WORKER_COUNT: nombre de nœuds de calcul à utiliser pour écrire des données dans le featurestore.

Méthode HTTP et URL :

POST http://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

Corps JSON de la requête :

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

Pour envoyer votre requête, choisissez l'une des options suivantes :

curl

Enregistrez le corps de la requête dans un fichier nommé request.json, puis exécutez la commande suivante :

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"http://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

Enregistrez le corps de la requête dans un fichier nommé request.json, puis exécutez la commande suivante :

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "http://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

Des résultats semblables aux lignes suivantes devraient s'afficher : Vous pouvez utiliser OPERATION_ID dans la réponse pour obtenir l'état de l'opération.

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

Pour savoir comment installer ou mettre à jour le SDK Vertex AI pour Python, consultez la section Installer le SDK Vertex AI pour Python. Pour en savoir plus, consultez la documentation de référence de l'API Python.

import datetime
from typing import List, Union

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Python

La bibliothèque cliente pour Vertex AI est incluse lorsque vous installez le SDK Vertex AI pour Python. Pour savoir comment installer le SDK Vertex AI pour Python, consultez la section Installer le SDK Vertex AI pour Python. Pour en savoir plus, consultez la documentation de référence de l'API SDK Vertex AI pour Python.

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.gapic.AvroSource(
        gcs_source=aiplatform.gapic.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.gapic.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java décrites dans le guide de démarrage rapide de Vertex AI à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Vertex AI Java.

Pour vous authentifier auprès de Vertex AI, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js décrites dans le guide de démarrage rapide de Vertex AI à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Vertex AI Node.js.

Pour vous authentifier auprès de Vertex AI, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

Afficher les jobs d'importation

Utilisez la console Google Cloud pour afficher les jobs d'importation par lot dans un projet Google Cloud.

UI Web

  1. Dans la section "Vertex AI" de Google Cloud Console, accédez à la page Caractéristiques.

    Accéder à la page "Caractéristiques"

  2. Sélectionnez une région dans la liste déroulante Région.
  3. Dans la barre d'action, cliquez sur Afficher les tâches d'ingestion pour répertorier les jobs d'importation de tous les magasins de caractéristiques.
  4. Cliquez sur l'ID d'un job d'importation pour afficher ses détails, tels que sa source de données, le nombre d'entités importées et le nombre de valeurs de caractéristiques importées.

Écraser les données existantes dans un magasin de caractéristiques

Vous pouvez réimporter les valeurs des caractéristiques pour écraser les valeurs existantes si elles ont les mêmes horodatages. Vous n'avez pas besoin de supprimer préalablement les valeurs existantes des caractéristiques. Par exemple, vous exploitez des données sources sous-jacentes qui ont été récemment modifiées. Pour que votre magasin de caractéristiques reste cohérent avec ces données sous-jacentes, importez à nouveau vos valeurs de caractéristiques. Si vous avez des horodatages non concordants, les valeurs importées sont considérées comme uniques et les anciennes valeurs continuent d'exister (elles ne sont pas écrasées).

Pour assurer la cohérence entre les requêtes de livraison par lots et en ligne, attendez que le job d'importation soit terminé avant d'effectuer des requêtes de diffusion.

Remplir les données de l'historique

Si vous remplissez des données, et que vous importez des valeurs de caractéristiques passées, désactivez la diffusion en ligne pour votre job d'importation. La diffusion en ligne sert uniquement à diffuser les valeurs de caractéristiques les plus récentes, ce qui n'est pas inclus par le remplissage. La désactivation de la diffusion en ligne est utile, car vous éliminez la charge sur vos nœuds de diffusion en ligne et vous augmentez le débit de votre job d'importation, ce qui peut réduire sa durée d'exécution.

Vous pouvez désactiver la diffusion en ligne pour les jobs d'importation lorsque vous utilisez l'API ou les bibliothèques clientes. Pour en savoir plus, consultez le champ disableOnlineServing de la méthode importFeatureValue.

Étape suivante