Esegui lo streaming di un argomento Kafka in Hive

Apache Kafka è una piattaforma di streaming distribuita open source per pipeline di dati e integrazione dei dati in tempo reale. Fornisce un sistema di streaming efficiente e scalabile per l'utilizzo in una varietà di applicazioni, tra cui:

  • Analisi in tempo reale
  • Elaborazione dei flussi
  • Aggregazione dei log
  • Messaggistica distribuita
  • Streaming di eventi

Obiettivi

  1. Installa Kafka su un cluster Managed Service per Apache Spark HA con ZooKeeper (in questo tutorial indicato come "cluster Managed Service per Apache Spark Kafka").

  2. Crea dati fittizi dei clienti, quindi pubblicali in un argomento Kafka.

  3. Crea tabelle Hive parquet e ORC in Cloud Storage per ricevere i dati degli argomenti Kafka in streaming.

  4. Invia un job PySpark per abbonarti e trasmettere in streaming l'argomento Kafka in Cloud Storage in formato Parquet e ORC.

  5. Esegui una query sui dati della tabella Hive in streaming per conteggiare i messaggi Kafka in streaming.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per saperne di più, consulta Esegui la pulizia.

Prima di iniziare

Se non l'hai ancora fatto, crea un progetto Google Cloud .

  1. Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.

    Vai a Bucket

  9. Fai clic su Crea.
  10. Nella pagina Crea un bucket, inserisci le informazioni del bucket. Per andare al passaggio successivo, fai clic su Continua.
    1. Nella sezione Inizia, segui questi passaggi:
    2. Nella sezione Scegli dove archiviare i tuoi dati, segui questi passaggi:
      1. Seleziona un Tipo di località.
      2. Scegli una posizione in cui i dati del bucket vengono archiviati in modo permanente dal menu a discesa Tipo di località.
        • Se selezioni il tipo di località a due regioni, puoi anche scegliere di attivare la replica turbo utilizzando la casella di controllo pertinente.
      3. Per configurare la replica tra bucket, seleziona Aggiungi una replica tra bucket mediante Storage Transfer Service e segui questi passaggi:

        Configura la replica tra bucket

        1. Nel menu Bucket, seleziona un bucket.
        2. Nella sezione Impostazioni di replica, fai clic su Configura per configurare le impostazioni per il job di replica.

          Viene visualizzato il riquadro Configura replica tra bucket.

          • Per filtrare gli oggetti da replicare in base al prefisso del nome dell'oggetto, inserisci un prefisso da cui includere o escludere gli oggetti, quindi fai clic su Aggiungi un prefisso.
          • Per impostare una classe di archiviazione per gli oggetti replicati, seleziona una classe di archiviazione dal menu Classe di archiviazione. Se salti questo passaggio, gli oggetti replicati utilizzeranno per impostazione predefinita la classe di archiviazione del bucket di destinazione.
          • Fai clic su Fine.
    3. Nella sezione Scegli come archiviare i tuoi dati, segui questi passaggi:
      1. Seleziona una classe di archiviazione predefinita per il bucket o Autoclass per la gestione automatica della classe di archiviazione dei dati del bucket.
      2. Per attivare lo spazio dei nomi gerarchico, nella sezione Ottimizza l'archiviazione per workload con uso intensivo dei dati, seleziona Abilita uno spazio dei nomi gerarchico in questo bucket.
    4. Nella sezione Scegli come controllare l'accesso agli oggetti, seleziona se il bucket applica o meno la prevenzione dell'accesso pubblico e seleziona un metodo di controllo dell'accesso per gli oggetti del bucket.
    5. Nella sezione Scegli come proteggere i dati degli oggetti, segui questi passaggi:
      • Seleziona una delle opzioni in Protezione dei dati che vuoi impostare per il bucket.
        • Per attivare l'eliminazione temporanea, fai clic sulla casella di controllo Criterio di eliminazione temporanea (per il recupero dei dati) e specifica il numero di giorni per cui vuoi conservare gli oggetti dopo l'eliminazione.
        • Per impostare il controllo delle versioni degli oggetti, seleziona la casella di controllo Controllo delle versioni degli oggetti (per il controllo delle versioni) e specifica il numero massimo di versioni per oggetto e il numero di giorni dopo i quali scadono le versioni non correnti.
        • Per abilitare il criterio di conservazione su oggetti e bucket, seleziona la casella di controllo Conservazione (per la conformità), quindi procedi nel seguente modo:
          • Per attivare il blocco della conservazione degli oggetti, fai clic sulla casella di controllo Abilita conservazione degli oggetti.
          • Per attivare Bucket Lock, fai clic sulla casella di controllo Imposta criterio di conservazione del bucket e scegli un'unità di tempo e una durata per il periodo di conservazione.
      • Per scegliere come verranno criptati i dati degli oggetti, espandi la sezione Crittografia dei dati () e seleziona un metodo di crittografia dei dati.
  11. Fai clic su Crea.

Passaggi del tutorial

Segui questi passaggi per creare un cluster Managed Service per Apache Spark Kafka per leggere un argomento Kafka in Cloud Storage in formato parquet O ORC.

Copia lo script di installazione di Kafka in Cloud Storage

Lo script dell'kafka.sh azione di inizializzazione installa Kafka su un cluster Managed Service per Apache Spark.

  1. Sfoglia il codice.

    #!/bin/bash
    #    Copyright 2015 Google, Inc.
    #
    #    Licensed under the Apache License, Version 2.0 (the "License");
    #    you may not use this file except in compliance with the License.
    #    You may obtain a copy of the License at
    #
    #        http://www.apache.org/licenses/LICENSE-2.0
    #
    #    Unless required by applicable law or agreed to in writing, software
    #    distributed under the License is distributed on an "AS IS" BASIS,
    #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #    See the License for the specific language governing permissions and
    #    limitations under the License.
    #
    # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
    # Dataproc cluster.
    
    set -euxo pipefail
    
    readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
    readonly KAFKA_HOME=/usr/lib/kafka
    readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
    readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
    readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
    readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
    readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
    readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
    
    # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
    ZOOKEEPER_ADDRESS=''
    # Integer broker ID of this node, e.g., 0
    BROKER_ID=''
    
    function retry_apt_command() {
      cmd="$1"
      for ((i = 0; i < 10; i++)); do
        if eval "$cmd"; then
          return 0
        fi
        sleep 5
      done
      return 1
    }
    
    function recv_keys() {
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} >= 3.0" | bc -l) == 1 ]]; then
        retry_apt_command "apt-get update && apt-get install -y gnupg"
        export GNUPGHOME="$(mktemp -d)"
        trap 'rm -rf "${GNUPGHOME}"' EXIT
        gpg --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C
        mkdir -p /etc/apt/trusted.gpg.d
        gpg --export B7B3B788A8D3785C > /etc/apt/trusted.gpg.d/mysql-repo.gpg
      else
        retry_apt_command "apt-get install -y gnupg2 && \
          apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
      fi
    }
    
    function update_apt_get() {
      retry_apt_command "apt-get update"
    }
    
    function install_apt_get() {
      pkgs="$@"
      retry_apt_command "apt-get install -y $pkgs"
    }
    
    function err() {
      echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
      return 1
    }
    
    # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
    function get_broker_list() {
      ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
        <<<"ls /brokers/ids" |
        grep '\[.*\]' |
        sed 's/\[/ /' |
        sed 's/\]/,/'
    }
    
    # Waits for zookeeper to be up or time out.
    function wait_for_zookeeper() {
      for i in {1..20}; do
        if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
          return 0
        else
          echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
      exit 1
    }
    
    # Wait until the current broker is registered or time out.
    function wait_for_kafka() {
      for i in {1..20}; do
        local broker_list=$(get_broker_list || true)
        if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
          return 0
        else
          echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to start Kafka broker ${BROKER_ID}." >&2
      exit 1
    }
    
    function install_and_configure_kafka_server() {
      # Find zookeeper list first, before attempting any installation.
      local zookeeper_client_port
      zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
        tail -n 1 |
        cut -d '=' -f 2)
    
      local zookeeper_list
      zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
        cut -d '=' -f 2 |
        cut -d ':' -f 1 |
        sort |
        uniq |
        sed "s/$/:${zookeeper_client_port}/" |
        xargs echo |
        sed "s/ /,/g")
    
      if [[ -z "${zookeeper_list}" ]]; then
        # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
        # bother to populate it. Check if YARN HA is configured.
        zookeeper_list=$(bdconfig get_property_value --configuration_file \
          /etc/hadoop/conf/yarn-site.xml \
          --name yarn.resourcemanager.zk-address 2>/dev/null)
      fi
    
      # If all attempts failed, error out.
      if [[ -z "${zookeeper_list}" ]]; then
        err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
      fi
    
      ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
    
      # Install Kafka from Dataproc distro.
      install_apt_get kafka-server || dpkg -l kafka-server ||
        err 'Unable to install and find kafka-server.'
    
      mkdir -p /var/lib/kafka-logs
      chown kafka:kafka -R /var/lib/kafka-logs
    
      if [[ "${ROLE}" == "Master" ]]; then
        # For master nodes, broker ID starts from 10,000.
        if [[ "$(hostname)" == *-m ]]; then
          # non-HA
          BROKER_ID=10000
        else
          # HA
          BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
        fi
      else
        # For worker nodes, broker ID is a random number generated less than 10000.
        # 10000 is choosen since the max broker ID allowed being set is 10000.
        BROKER_ID=$((RANDOM % 10000))
      fi
      sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
        "${KAFKA_PROP_FILE}"
      echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
      echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
    
      if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
        sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
        sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
      fi
    
      wait_for_zookeeper
    
      # Start Kafka.
      service kafka-server restart
    
      wait_for_kafka
    }
    
    function install_kafka_python_package() {
      KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
      if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
        return
      fi
    
      if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
        /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
      else
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ "${OS}" == "rocky" ]]; then
          yum install -y python2-pip
        else
          apt-get install -y python-pip
        fi
        pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
      fi
    }
    
    function remove_old_backports {
      # This script uses 'apt-get update' and is therefore potentially dependent on
      # backports repositories which have been archived.  In order to mitigate this
      # problem, we will remove any reference to backports repos older than oldstable
    
      # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
      oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
      stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
    
      matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
      if [[ -n "$matched_files" ]]; then
        for filename in "$matched_files"; do
          grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
            sed -i -e 's/^.*-backports.*$//' "$filename"
        done
      fi
    }
    
    function main() {
      OS=$(. /etc/os-release && echo "${ID}")
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
        remove_old_backports
      fi
      recv_keys || err 'Unable to receive keys.'
      update_apt_get || err 'Unable to update packages lists.'
      install_kafka_python_package
    
      # Only run the installation on workers; verify zookeeper on master(s).
      if [[ "${ROLE}" == 'Master' ]]; then
        service zookeeper-server status ||
          err 'Required zookeeper-server not running on master!'
        if [[ "${RUN_ON_MASTER}" == "true" ]]; then
          # Run installation on masters.
          install_and_configure_kafka_server
        else
          # On master nodes, just install kafka command-line tools and libs but not
          # kafka-server.
          install_apt_get kafka ||
            err 'Unable to install kafka libraries on master!'
        fi
      else
        # Run installation on workers.
        install_and_configure_kafka_server
      fi
    }
    
    main
    

  2. Copia lo script kafka.sh dell'azione di inizializzazione nel bucket Cloud Storage. Questo script installa Kafka su un cluster Managed Service per Apache Spark.

    1. Apri Cloud Shell ed esegui questo comando:

      gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
      

      Effettua le seguenti sostituzioni:

      • REGION: kafka.sh è archiviato in bucket pubblici con tag regionali in Cloud Storage. Specifica una regione di Compute Engine geograficamente vicina (ad esempio us-central1).
      • BUCKET_NAME: il nome del bucket Cloud Storage.

Crea un cluster Managed Service per Apache Spark Kafka

  1. Apri Cloud Shell, poi esegui il seguente comando gcloud dataproc clusters create per creare un cluster HA Managed Service per Apache Spark che installa i componenti Kafka e ZooKeeper:

    gcloud dataproc clusters create KAFKA_CLUSTER \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.1-debian11 \
        --num-masters=3 \
        --enable-component-gateway \
        --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
    

    Note:

    • KAFKA_CLUSTER: il nome del cluster, che deve essere univoco all'interno di un progetto. Il nome deve iniziare con una lettera minuscola e può contenere fino a 51 lettere minuscole, numeri e trattini. Non può terminare con un trattino. Il nome di un cluster eliminato può essere riutilizzato.
    • PROJECT_ID: il progetto da associare a questo cluster.
    • REGION: la regione di Compute Engine in cui si troverà il cluster, ad esempio us-central1.
      • Puoi aggiungere il flag facoltativo --zone=ZONE per specificare una zona all'interno della regione specificata, ad esempio us-central1-a. Se non specifichi una zona, la funzionalità di posizionamento automatico delle zone di Managed Service for Apache Spark seleziona una zona con la regione specificata.
    • --image-version: versione dell'immagine di Managed Service per Apache Spark 2.1-debian11 è consigliata per questo tutorial. Nota: ogni versione dell'immagine contiene un insieme di componenti preinstallati, incluso il componente Hive utilizzato in questo tutorial (vedi Versioni dell'immagine Managed Service for Apache Spark supportate).
    • --num-master: i nodi master 3 creano un cluster HA. Il componente Zookeeper, richiesto da Kafka, è preinstallato su un cluster HA.
    • --enable-component-gateway: attiva il gateway dei componenti di Managed Service per Apache Spark.
    • BUCKET_NAME: il nome del bucket Cloud Storage che contiene lo script di inizializzazione /scripts/kafka.sh (vedi Copiare lo script di installazione di Kafka in Cloud Storage).

Crea un argomento Kafka custdata

Per creare un argomento Kafka sul cluster Kafka di Managed Service per Apache Spark:

  1. Utilizza l'utilità SSH per aprire una finestra del terminale sulla VM master del cluster.

  2. Crea un argomento Kafka custdata.

    /usr/lib/kafka/bin/kafka-topics.sh \
        --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
        --create --topic custdata
    

    Note:

    • KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka. -w-0:9092 indica il broker Kafka in esecuzione sulla porta 9092 sul nodo worker-0.

    • Dopo aver creato l'argomento custdata, puoi eseguire i seguenti comandi:

      # List all topics.
      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --list
      
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

Pubblica contenuti nell'argomento Kafka custdata

Il seguente script utilizza lo strumento kafka-console-producer.sh Kafka per generare dati fittizi dei clienti in formato CSV.

  1. Copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka. Premi <return> per eseguire lo script.

    for i in {1..10000}; do \
    custname="cust name${i}"
    uuid=$(dbus-uuidgen)
    age=$((45 + $RANDOM % 45))
    amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
    message="${uuid}:${custname},${age},${amount}"
    echo ${message}
    done | /usr/lib/kafka/bin/kafka-console-producer.sh \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata \
    --property "parse.key=true" \
    --property "key.separator=:"
    

    Note:

    • KAFKA_CLUSTER: il nome del cluster Kafka.
  2. Esegui questo comando Kafka per verificare che l'argomento custdata contenga 10.000 messaggi.

    /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata
    

    Note:

    • KAFKA_CLUSTER: il nome del cluster Kafka.

    Output previsto:

    custdata:0:10000
    

Crea tabelle Hive in Cloud Storage

Crea tabelle Hive per ricevere i dati degli argomenti Kafka in streaming. Segui questi passaggi per creare tabelle Hive cust_parquet (parquet) e cust_orc (ORC) nel bucket Cloud Storage.

  1. Inserisci il tuo BUCKET_NAME nel seguente script, quindi copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka, poi premi <return> per creare uno script ~/hivetables.hql (Hive Query Language).

    Nel passaggio successivo eseguirai lo script ~/hivetables.hql per creare tabelle Hive parquet e ORC nel bucket Cloud Storage.

    cat > ~/hivetables.hql <<EOF
    drop table if exists cust_parquet;
    create external table if not exists cust_parquet
    (uuid string, custname string, age string, amount string)
    row format delimited fields terminated by ','
    stored as parquet
    location "gs://BUCKET_NAME/tables/cust_parquet";
    

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. Nel terminale SSH sul nodo master del cluster Kafka, invia il job Hive ~/hivetables.hql per creare le tabelle Hive cust_parquet (parquet) e cust_orc (ORC) nel bucket Cloud Storage.

    gcloud dataproc jobs submit hive \
        --cluster=KAFKA_CLUSTER \
        --region=REGION \
        -f ~/hivetables.hql
    

    Note:

    • Il componente Hive è preinstallato nel cluster Managed Service per Apache Spark Kafka. Consulta le versioni di rilascio 2.1.x per un elenco delle versioni dei componenti Hive incluse nelle immagini 2.1 rilasciate di recente.
    • KAFKA_CLUSTER: il nome del cluster Kafka.
    • REGION: la regione in cui si trova il cluster Kafka.

Trasmetti in streaming Kafka custdata alle tabelle Hive

  1. Esegui il comando seguente nel terminale SSH sul nodo master del cluster Kafka per installare la libreria kafka-python. È necessario un client Kafka per trasmettere i dati degli argomenti Kafka a Cloud Storage.
    pip install kafka-python
    
  2. Inserisci il tuo BUCKET_NAME, quindi copia e incolla il seguente codice PySpark nel terminale SSH sul nodo master del cluster Kafka e premi <return> per creare un file streamdata.py.

    Lo script si iscrive all'argomento Kafka custdata, quindi trasmette i dati alle tabelle Hive in Cloud Storage. Il formato di output, che può essere Parquet o ORC, viene passato allo script come parametro.

    cat > streamdata.py <<EOF
    #!/bin/python
    
    import sys
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from kafka import KafkaConsumer
    
    def getNameFn (data): return data.split(",")[0]
    def getAgeFn  (data): return data.split(",")[1]
    def getAmtFn  (data): return data.split(",")[2]
    
    def main(cluster, outputfmt):
        spark = SparkSession.builder.appName("APP").getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        Logger = spark._jvm.org.apache.log4j.Logger
        logger = Logger.getLogger(__name__)
    
        rows = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
        .option("startingOffsets", "earliest")\
        .load()
    
        getNameUDF = udf(getNameFn, StringType())
        getAgeUDF  = udf(getAgeFn,  StringType())
        getAmtUDF  = udf(getAmtFn,  StringType())
    
        logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
    
        query = rows.select (col("key").cast("string").alias("uuid"),\
            getNameUDF      (col("value").cast("string")).alias("custname"),\
            getAgeUDF       (col("value").cast("string")).alias("age"),\
            getAmtUDF       (col("value").cast("string")).alias("amount"))
    
        writer = query.writeStream.format(outputfmt)\
                .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
            .outputMode("append")\
            .start()
    
        writer.awaitTermination()
    
    if __name__=="__main__":
        if len(sys.argv) < 2:
            print ("Invalid number of arguments passed ", len(sys.argv))
            print ("Usage: ", sys.argv[0], " cluster  format")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
        main(sys.argv[1], sys.argv[2])
    
    EOF
    
  3. Nel terminale SSH sul nodo master del cluster Kafka, esegui spark-submit per trasmettere i dati in streaming alle tabelle Hive in Cloud Storage.

    1. Inserisci il nome del tuo KAFKA_CLUSTER e dell'output FORMAT, quindi copia e incolla il seguente codice nel terminale SSH sul nodo master del tuo cluster Kafka, quindi premi <return> per eseguire il codice e trasmettere in streaming i dati custdata di Kafka in formato Parquet alle tue tabelle Hive in Cloud Storage.

      spark-submit --packages \
      org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
          --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
          --conf spark.driver.memory=4096m \
          --conf spark.executor.cores=2 \
          --conf spark.executor.instances=2 \
          --conf spark.executor.memory=6144m \
          streamdata.py KAFKA_CLUSTER FORMAT
          

      Note:

      • KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
      • FORMAT: specifica parquet o orc come formato di output. Puoi eseguire il comando in successione per trasmettere in streaming entrambi i formati alle tabelle Hive: ad esempio, nella prima chiamata, specifica parquet per trasmettere in streaming l'argomento Kafka custdata alla tabella Hive parquet; poi, nella seconda chiamata, specifica il formato orc per trasmettere in streaming custdata alla tabella Hive ORC.
  4. Quando l'output standard si interrompe nel terminale SSH, il che indica che tutto il custdata è stato trasmesso in streaming, premi <control-c> nel terminale SSH per interrompere il processo.

  5. Elenca le tabelle Hive in Cloud Storage.

    gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
    

    Note:

    • BUCKET_NAME: inserisci il nome del bucket Cloud Storage che contiene le tabelle Hive (vedi Creare tabelle Hive).

Eseguire query sui dati in streaming

  1. Nel terminale SSH sul nodo master del cluster Kafka, esegui il seguente comando hive per conteggiare i messaggi Kafka custdata trasmessi in streaming nelle tabelle Hive in Cloud Storage.

    hive -e "select count(1) from TABLE_NAME"
    

    Note:

    • TABLE_NAME: specifica cust_parquet o cust_orc come nome della tabella Hive.

    Snippet di output previsto:

...
Status: Running (Executing on YARN cluster with App id application_....)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)

Esegui la pulizia

Elimina il progetto

    Elimina un progetto Google Cloud :

    gcloud projects delete PROJECT_ID

Elimina risorse

  • Elimina il bucket:
    gcloud storage buckets delete BUCKET_NAME
  • Elimina il cluster Kafka:
    gcloud dataproc clusters delete KAFKA_CLUSTER \
        --region=${REGION}