Usar o conector do Bigtable do Spark

Com o conector Spark do Bigtable, é possível ler e gravar dados no Bigtable e a partir dele. Leia dados do seu aplicativo Spark usando o Spark SQL e o DataFrames. As seguintes operações do Bigtable são compatíveis com o uso do conector do Bigtable Spark:

  • Gravar dados
  • Ler dados
  • Criar uma tabela

Neste documento, mostramos como converter uma tabela DataFrames do Spark SQL em uma tabela do Bigtable e, em seguida, compilar e criar um arquivo JAR para enviar um job do Spark.

Status de suporte do Spark e do Scala

O conector Spark do Bigtable aceita apenas a versão Scala 2.12 e as seguintes versões do Spark:

O conector Spark do Bigtable é compatível com as seguintes versões do Dataproc:

Calcular os custos

Se você decidir usar qualquer um dos seguintes componentes faturáveis do Google Cloud, você será cobrado pelos recursos que usar:

  • Bigtable (você não recebe cobranças pelo uso do emulador do Bigtable)
  • Dataproc
  • Cloud Storage

Os preços do Dataproc se aplicam ao uso do Dataproc em clusters do Compute Engine. Os preços do Dataproc sem servidor se aplicam a cargas de trabalho e sessões executadas no Dataproc sem servidor para Spark.

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.

Antes de começar

Conclua os pré-requisitos a seguir antes de usar o conector do Bigtable Spark.

Funções exigidas

Para receber as permissões necessárias para usar o conector do Bigtable Spark, peça ao administrador para conceder a você os seguintes papéis do IAM no seu projeto:

  • Administrador do Bigtable (roles/bigtable.admin)(opcional): permite ler ou gravar dados e criar uma nova tabela.
  • Usuário do Bigtable (roles/bigtable.user): permite ler ou gravar dados, mas não permite criar uma nova tabela.

Para mais informações sobre como conceder papéis, consulte Gerenciar acesso.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Se você estiver usando o Dataproc ou o Cloud Storage, permissões adicionais podem ser necessárias. Para mais informações, consulte Permissões do Dataproc e do Cloud Storage.

Configurar o Spark

Além de criar uma instância do Bigtable, você também precisa configurar sua instância do Spark. É possível fazer isso localmente ou selecionar uma destas opções para usar o Spark com o Dataproc:

  • Cluster do Dataproc
  • Dataproc sem servidor

Para mais informações sobre como escolher entre um cluster do Dataproc ou uma opção sem servidor, consulte a documentação Dataproc sem servidor para Spark em comparação com o Dataproc no Compute Engine .

Fazer o download do arquivo JAR do conector

O código-fonte do conector Spark do Bigtable pode ser encontrado com exemplos no repositório GitHub do conector do Bigtable no GitHub (em inglês).

Com base na sua configuração do Spark, é possível acessar o arquivo JAR da seguinte forma:

  • Se você estiver executando o PySpark localmente, faça o download do arquivo JAR do conector no local gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar do Cloud Storage.

    Substitua SCALA_VERSION pela versão do Scala, defina 2.12 como a única versão compatível e CONNECTOR_VERSION pela versão do conector que você quer usar.

  • Para o cluster do Dataproc ou a opção sem servidor, use o arquivo JAR mais recente como um artefato que pode ser adicionado aos aplicativos Scala ou Java Spark. Para mais informações sobre como usar o arquivo JAR como um artefato, consulte Gerenciar dependências.

  • Se você estiver enviando o job do PySpark ao Dataproc, use a sinalização gcloud dataproc jobs submit pyspark --jars para definir o URI no local do arquivo JAR no Cloud Storage, por exemplo, gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

Adicionar a configuração do Bigtable ao aplicativo Spark

No aplicativo Spark, adicione as opções do Spark para interagir com o Bigtable.

Opções do Spark com suporte

Use as opções do Spark disponíveis como parte do pacote com.google.cloud.spark.bigtable.

Nome da opção Obrigatório Valor padrão Significado
spark.bigtable.project.id Sim N/A Defina o ID do projeto do Bigtable.
spark.bigtable.instance.id Sim N/A Definir o ID da instância do Bigtable.
catalog Sim N/A Defina o formato JSON que especifica o formato de conversão entre o esquema do tipo SQL do DataFrame e o esquema da tabela do Bigtable.

Consulte Criar metadados de tabela no formato JSON para mais informações.
spark.bigtable.app_profile.id Não default Defina o ID do perfil do app Bigtable.
spark.bigtable.write.timestamp.milliseconds Não Hora atual do sistema Defina o carimbo de data/hora em milissegundos para usar ao gravar um DataFrame no Bigtable.

Como todas as linhas no DataFrame usam o mesmo carimbo de data/hora, as linhas com a mesma coluna de chave de linha no DataFrame persistem como uma única versão no Bigtable porque compartilham o mesmo carimbo de data/hora.
spark.bigtable.create.new.table Não false Defina como true para criar uma nova tabela antes de gravar no Bigtable.
spark.bigtable.read.timerange.start.milliseconds ou spark.bigtable.read.timerange.end.milliseconds Não N/A Defina carimbos de data/hora (em milissegundos desde o horário da época) para filtrar as células com datas específicas de início e término, respectivamente. É preciso especificar ambos ou nenhum desses parâmetros.
spark.bigtable.push.down.row.key.filters Não true Defina como true para permitir a filtragem simples de chave de linha no lado do servidor. A filtragem em chaves de linha compostas é implementada no lado do cliente.

Consulte Ler uma linha específica do DataFrame usando um filtro para mais informações.
spark.bigtable.read.rows.attempt.timeout.milliseconds Não 30min Defina a duração do tempo limite de uma tentativa de leitura de linhas correspondente a uma partição do DataFrame no cliente Bigtable para Java.
spark.bigtable.read.rows.total.timeout.milliseconds Não 12h Defina a duração do tempo limite total de uma tentativa de leitura de linhas correspondente a uma partição do DataFrame no cliente Bigtable para Java.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds Não 1 min Defina a duração do tempo limite de uma tentativa de modificação de linhas correspondente a uma partição do DataFrame no cliente Bigtable para Java.
spark.bigtable.mutate.rows.total.timeout.milliseconds Não 10 min Defina a duração do tempo limite total de uma tentativa de modificação de linhas correspondente a uma partição do DataFrame no cliente Bigtable para Java.
spark.bigtable.batch.mutate.size Não 100 Defina conforme o número de mutações em cada lote. O valor máximo que você pode definir é 100000.
spark.bigtable.enable.batch_mutate.flow_control Não false Defina como true para ativar o controle de fluxo para mutações em lote.

Criar metadados de tabela no formato JSON

O formato da tabela DataFrames do Spark SQL precisa ser convertido em uma tabela do Bigtable usando uma string com o formato JSON. Esse formato JSON de string torna o formato de dados compatível com o Bigtable. É possível transmitir o formato JSON no código do aplicativo usando a opção .option("catalog", catalog_json_string).

Por exemplo, considere a seguinte tabela do DataFrame e a tabela do Bigtable correspondente.

Neste exemplo, as colunas name e birthYear no DataFrame são agrupadas no grupo de colunas info e renomeadas como name e birth_year, respectivamente. Da mesma forma, a coluna address é armazenada no grupo de colunas location com o mesmo nome de coluna. A coluna id do DataFrame é convertida na chave de linha do Bigtable.

As chaves de linha não têm um nome de coluna dedicado no Bigtable e, neste exemplo, id_rowkey é usado apenas para indicar ao conector que essa é a coluna da chave de linha. É possível usar qualquer nome para a coluna da chave de linha e garantir que você use o mesmo nome ao declarar o campo "rowkey":"column_name" no formato JSON.

DataFrame Tabela do Bigtable = t1
Colunas Chave de linha Grupos de colunas
informações local
Colunas Colunas
id name birthYear address id_rowkey name birth_year address

O formato JSON para o catálogo é o seguinte:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

As chaves e os valores usados no formato JSON são os seguintes:

Chave do catálogo Valor do catálogo Formato JSON
table Nome da tabela do Bigtable. "table":{"name":"t1"}

Se a tabela não existir, use .option("spark.bigtable.create.new.table", "true") para criá-la.
chave de linha Nome da coluna que será usada como chave de linha do Bigtable. Verifique se o nome da coluna do DataFrame é usado como a chave de linha, por exemplo, id_rowkey.

Chaves compostas também são aceitas como chaves de linha. Por exemplo, "rowkey":"name:address". Essa abordagem pode resultar em chaves de linha que exigem uma verificação completa da tabela para todas as solicitações de leitura.
"rowkey":"id_rowkey",
colunas Mapeamento de cada coluna do DataFrame para o grupo de colunas correspondente do Bigtable ("cf") e o nome da coluna ("col"). O nome da coluna pode ser diferente do nome da coluna na tabela do DataFrame. Os tipos de dados com suporte incluem string, long e binary. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

Neste exemplo, id_rowkey é a chave de linha e info e location são os grupos de colunas.

Tipos de dados compatíveis

O conector é compatível com o uso dos tipos string, long e binary (matriz de bytes) no catálogo. Até que o suporte para outros tipos, como int e float, seja adicionado, é possível converter manualmente esses tipos de dados em matrizes de bytes (BinaryType do Spark SQL) antes de usar o conector para gravá-los no Bigtable.

Além disso, é possível usar o Avro para serializar tipos complexos, como ArrayType. Para mais informações, consulte Serializar tipos de dados complexos usando o Apache Avro.

Gravar no Bigtable

Use a função .write() e as opções compatíveis para gravar seus dados no Bigtable.

Java

O código a seguir do repositório do GitHub usa Java e Maven para gravar no Bigtable.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");

…

  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

O código a seguir do repositório do GitHub usa o Python para gravar no Bigtable.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  …

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  …

Ler do Bigtable

Use a função .read() para verificar se a tabela foi importada para o Bigtable.

Java

  …
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  …

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

Compilar seu projeto

Gere o arquivo JAR usado para executar um job em um cluster do Dataproc, sem servidor do Dataproc ou em uma instância local do Spark. É possível compilar o arquivo JAR localmente e usá-lo para enviar um job. O caminho para o JAR compilado é definido como a variável de ambiente PATH_TO_COMPILED_JAR quando você envia um job.

Esta etapa não se aplica aos aplicativos do PySpark.

Gerenciar dependências

O conector Spark do Bigtable oferece suporte às ferramentas de gerenciamento de dependências a seguir:

Compilar o arquivo JAR

Maven

  1. Adicione a dependência spark-bigtable ao arquivo pom.xml.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Adicione o plug-in Maven Shade ao seu arquivo pom.xml para criar um JAR uber:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. Execute o comando mvn clean install para gerar um arquivo JAR.

sbt

  1. Adicione a dependência spark-bigtable ao seu arquivo build.sbt:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
    
  2. Adicione o plug-in sbt-assembly ao arquivo project/plugins.sbt ou project/assembly.sbt para criar um arquivo Uber JAR.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
    
  3. Execute o comando sbt clean assembly para gerar o arquivo JAR.

Gradle

  1. Adicione a dependência spark-bigtable ao arquivo build.gradle.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
    
  2. Adicione o plug-in Shadow ao arquivo build.gradle para criar um arquivo JAR uber:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
    
  3. Consulte a documentação do plug-in Shadow para mais informações sobre configuração e compilação JAR.

Envie um job

Envie um job do Spark usando o Dataproc, o Dataproc sem servidor ou uma instância local do Spark para iniciar o aplicativo.

Definir ambiente de execução

Defina as seguintes variáveis de ambiente.

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

Substitua:

  • PROJECT_ID: o identificador permanente do projeto do Bigtable.
  • INSTANCE_ID: o identificador permanente da instância do Bigtable.
  • TABLE_NAME: identificador permanente da tabela.
  • DATAPROC_CLUSTER: o identificador permanente do cluster do Dataproc.
  • DATAPROC_REGION: a região do Dataproc que contém um dos clusters na sua instância do Dataproc, por exemplo, northamerica-northeast2.
  • DATAPROC_ZONE: a zona em que o cluster do Dataproc é executado.
  • SUBNET: o caminho completo do recurso da sub-rede.
  • GCS_BUCKET_NAME: o bucket do Cloud Storage para fazer upload de dependências de carga de trabalho do Spark.
  • PATH_TO_COMPILED_JAR: o caminho completo ou relativo para o JAR compilado. Por exemplo, /path/to/project/root/target/<compiled_JAR_name> para o Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: o bucket gs://spark-lib/bigtable do Cloud Storage, em que o arquivo spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar está localizado.
  • PATH_TO_PYTHON_FILE: para aplicativos PySpark, o caminho para o arquivo Python que será usado para gravar e ler dados do Bigtable.
  • LOCAL_PATH_TO_CONNECTOR_JAR: para aplicativos PySpark, o caminho para o arquivo JAR do conector do Bigtable do Spark transferido por download.

Enviar um job do Spark

Para instâncias do Dataproc ou da configuração local do Spark, execute um job do Spark para fazer upload dos dados no Bigtable.

Cluster do Dataproc

Usar o arquivo JAR compilado e criar um job de cluster do Dataproc que leia e grave dados no Bigtable.

  1. Criar um cluster de Dataproc. O exemplo a seguir mostra um comando de amostra para criar um cluster do Dataproc v2.0 com o Debian 10, dois nós de trabalho e configurações padrão.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. Enviar um job.

    Scala/Java

    O exemplo a seguir mostra a classe spark.bigtable.example.WordCount, que inclui a lógica para criar uma tabela de teste no DataFrame, gravar a tabela no Bigtable e contar o número de palavras na tabela.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc sem servidor

Usar o arquivo JAR compilado e criar um job do Dataproc que leia e grave dados no Bigtable com uma instância sem servidor do Dataproc.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Spark local

Use o arquivo JAR transferido por download e crie um job do Spark que leia e grave dados no Bigtable com uma instância local do Spark. Também é possível usar o emulador do Bigtable para enviar o job do Spark.

Usar o emulador do Bigtable

Para usar o emulador do Bigtable, siga estas etapas:

  1. Execute o seguinte comando para iniciar o emulador:

    gcloud beta emulators bigtable start
    

    Por padrão, o emulador escolhe localhost:8086.

  2. Defina a variável de ambiente BIGTABLE_EMULATOR_HOST:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Envie o job do Spark.

Para mais informações sobre como usar o emulador do Bigtable, consulte Testar usando o emulador.

Enviar um job do Spark

Use o comando spark-submit para enviar um job do Spark, independente de você estar usando um emulador local do Bigtable.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Verificar os dados da tabela

Execute o seguinte comando cbt da CLI para verificar se os dados foram gravados no Bigtable. A CLI cbt é um componente da Google Cloud CLI. Para mais informações, consulte a visão geral da CLI cbt.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

Soluções adicionais

Use o conector Spark do Bigtable para soluções específicas, como serializar tipos complexos do Spark SQL, ler linhas específicas e gerar métricas do lado do cliente.

Ler uma linha específica do DataFrame usando um filtro

Ao usar o DataFrames para ler no Bigtable, você pode especificar um filtro para ler somente linhas específicas. Filtros simples, como ==, <= e startsWith na coluna da chave de linha, são aplicados no lado do servidor para evitar uma verificação completa da tabela. Os filtros em chaves de linha compostas ou complexos, como LIKE na coluna da chave de linha, são aplicados no lado do cliente.

Se você estiver lendo tabelas grandes, recomendamos usar filtros de chave de linha simples para evitar a verificação de toda a tabela. O exemplo de instrução a seguir mostra como ler usando um filtro simples. No filtro do Spark, verifique se você usa o nome da coluna do DataFrame convertida na chave de linha:

    dataframe.filter("id == 'some_id'").show()
  

Ao aplicar um filtro, use o nome da coluna do DataFrame em vez do nome da coluna da tabela do Bigtable.

Serializar tipos de dados complexos usando o Apache Avro

O conector Spark do Bigtable oferece suporte ao uso do Apache Avro para serializar tipos do Spark SQL complexos, como ArrayType, MapType ou StructType. O Apache Avro fornece serialização de dados para dados de registro que são comumente usados para processar e armazenar estruturas de dados complexas.

Use uma sintaxe como "avro":"avroSchema" para especificar que uma coluna no Bigtable precisa ser codificada usando Avro. Em seguida, é possível usar .option("avroSchema", avroSchemaString) ao ler ou gravar no Bigtable para especificar o esquema Avro correspondente a essa coluna no formato de string. É possível usar nomes de opções diferentes, por exemplo, "anotherAvroSchema" para colunas diferentes e transmitir esquemas Avro para várias colunas.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

Usar métricas do lado do cliente

Como o conector Spark do Bigtable é baseado no cliente do Bigtable para Java, as métricas do lado do cliente são ativadas dentro do conector por padrão. Consulte a documentação sobre métricas do lado do cliente para mais detalhes sobre como acessar e interpretar essas métricas.

Usar o cliente do Bigtable para Java com funções RDD de baixo nível

Como o conector Spark do Bigtable é baseado no cliente Bigtable para Java, é possível usar diretamente o cliente nos aplicativos Spark e realizar solicitações de leitura ou gravação distribuídas nas funções RDD de baixo nível, como mapPartitions e foreachPartition.

Para usar o cliente do Bigtable para classes Java, anexe o prefixo com.google.cloud.spark.bigtable.repackaged aos nomes dos pacotes. Por exemplo, em vez de usar o nome da classe como com.google.cloud.bigtable.data.v2.BigtableDataClient, use com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient.

Para mais informações sobre o cliente Bigtable para Java, consulte Cliente do Bigtable para Java.

A seguir