sábado, 27 de julho de 2019

Fast Data Modulo 1: Instalação do Apache Kafka



Kafka é um sistema distribuidor, particionador e replicador de comits de serviços de logs. Ele fornece a funcionalidade de um sistema de mensagens, mas com um design único. O Kafka inscreve e publica registros em um fluxo de mensagens, armazenando esses fluxos de uma maneira durável e tolerante a falhas.



Pré-requisitos:
VM com o CENTOS 7 instalado:
Java 8 instalado na VM.



1    1. Baixar o Apache Kafka na VM, para isso digite a instrução: 

      2. Descompactar o arquivo com o seguinte comando:
    #tar -xzf kafka_2.11-2.2.0.tgz
    #mv kafka_2.11-2.2.0.tgz kafka
    #cd kafka

3. Para configurar o Kafka entre do diretório config da pasta Kafka.

               Copie o arquivo server.properties para um novo server-1.properties

               #cp server.properties server-1.properties
               #vi server-1.properties

               Defina as seguintes configurações:
·        Na propriedade broker.id, coloque o numero identifica o server do Kafka, como nós estamos fazendo em um único nó, vamos colocar o valor 1
·        Na propriedade listeners, vamos inserir o IPV4 da vm.
·        Na log.dirs, configure o diretório de log para o server do Kafka
·        Para a propriedade zookeeper.connect, mencione os ips do zookeeper, no nosso caso será o mesmo da máquina.

broker.id=1
listeners=PLAINTEXT://192.168.56.102:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.56.102:2181

      4. Inciar o serviço entre na pasta do kafka:

·        Zookeeper, cria em id na pasta tmp e inicie o serviço:

#mkdir /tmp/zookeeper/
#echo "1" > /tmp/zookeeper/myid
#nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

·        Kafka
#nohup bin/kafka-server-start.sh config/server-1.properties &

      5. Criar um tópico:

#bin/kafka-topics.sh --create --zookeeper 192.168.56.102:2181 --replication-factor 1  --partitions 1 --topic nomedotopico

 Listar todos os tópicos

#bin/kafka-topics.sh --list --zookeeper 192.168.56.102:2181


1    6. Testando um Producer e um Consumer:

      Crie duas sessões de acesso ao putty, uma será chamado o consumer:

      #bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 --topic elastic --from-beginning
    
      Na outra será criado o produtor de mensagens
       
      #bin/kafka-console-producer.sh --broker-list 192.168.56.102:9092 --topic elastic

      Ao digitar a mensagem no sessão do producer a informação deverá aparecer no consumer.



Essa instalação do Kafka, porem de muito simples, vai ser utilizado como base para se trabalhar com diversas ferramentas de fast data.

Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.

quinta-feira, 25 de julho de 2019

Apache Spark com Scala: Manipulação de DataFrame Etapa1


O DataFrame é um coleção distribuida de dados, no qual oferece os beneficios  do RDD  (Resilient Distributed Dataset)  com uma tipificação forte, untilizando as funções Lambdas das liguagem, para a manipulação de registros executando funçoes de join, filter, sum, flatmap, etc.
O DataFrame esta incluso nas bibliotecas Spark SQL, oferecidas pelo Apache Spark e torna a visão dos dados de acordo a uma tabela relacional.

Para executar o exemplo abaixo, é necessário seguir o procedimento do artigo:

https://slothbigdata.blogspot.com/2019/07/ambiente-para-rodar-o-spark-com-scala.html

Para o exercício, será criado uma instância do SparkSession, que esta disponível nas versões do Spark Apache 2.0

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
   }

}

Instanciado o SparkSession definindo o nome e a engine de processamento, insira o arquivo cidadao.json em uma pasta, este arquivo foi baixado no repositório do git conforme o artigo mencionado acima.
Capture o arquivo json com o seguinte processo.

1 - Cria um StructType para definição do schema da tabela. O StructType é uma classe do Spark no qual efetua o mapeamento do Dataframe.

2 - Carga de dados do Json:


package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext                                                                             .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //print no Driver
     dfcidadao.collect().map(row => println(row))
    
   }
}



Com o Json estruturado para um dataframe "dfcidadao", agora pode-se efetuar um filtro com dados, no caso abaixo será efetuar um filtro para o estado de SP, para esse processo pode-se ocorrer em três modos:

Informar dataframe("coluna")

val dffilter1 = dfcidadao.filter(dfcidadao("uf")==="SP")

Mas caso utilizar o pacote: org.apache.spark.sql.functions._: col("coluna")

val dffilter1 = dfcidadao.filter(col("uf")==="SP")

Mas se utilizar as classes implicitas: spark.implicits._ : $"coluna"

val dffilter1 = dfcidadao.filter($"uf"==="SP"

Código completo:


package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
     dffilter1.collect().map(row => println(row))
  
   }
}

Processo de Join

Para o caso de join, jogue em um diretório o do git: profissoes.csv
Agora vamos ler um arquivo em CSV com o seguinte processo.

1 - Criar o StructType com a estrutura de dados do CSV.
2 - Importar o CSV e criar um Dataframe.

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
    
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )   
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
     dfprof.collect().map(row => println(row))

   }
}

Para efetuar um Join com o dataframe dffilter1 com o dataframe dfprof pode-se efeuar em duas maneiras:

Método: dataframe1.join(dataframe2, condicao, operacao)

Informando os dataframes:  dataframe(coluna)

dffilter1.join(dfprof,dffilter1("id_profissao") === dfprof("id_profissao"),"inner")

Utilizando a o pacote : org.apache.spark.sql.functions._: Seq("coluna") << nesse caso as colunas relacionadas no dataframe devem ter o mesmo nome.

dffilter1.join(dfprof,Seq("id_profissao"),"inner")


Código completo do Join:

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
  
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
    
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )     
    
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
    
     val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")

     //join das pessoas com as profissoes
     dfCidadaoProfissao.collect().map(row => println(row))    
   }
}



Para finalizar o processo, vamos efetuar uma contagem dos profissionais de SP, a instrução será agrupar as profissões e mostrar as quantidades de profissionais:

val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
     
Código completo com o Count.

package com.slothbigdata.licao

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataFrameFilterJoin {
 
   def main(args: Array[String]): Unit = {
 
     val spark = SparkSession.builder()
                             .appName("Spark Pi")
                             .master("local")
                             .getOrCreate()
     import spark.implicits._
    
     //definindo schema
     val schemaCidadao    = StructType(
                              List(
                                StructField("nome",StringType,true),
                                StructField("idade",IntegerType,true),
                                StructField("cidade",StringType,true),
                                StructField("uf",StringType,true),
                                StructField("id_profissao",IntegerType,true)
                              )
                            )        
     //json
     val jsonRDD = spark.sparkContext
                        .wholeTextFiles("C:\\dados\\arquivo_cidadao\\cidadao.json")
                        .map(x => x._2)
     val dfcidadao = spark.read.schema(schemaCidadao).json(jsonRDD)
     //filtro para a uf SP
     val dffilter1 = dfcidadao.filter(col("uf")==="SP")
     //csv
     val schemaProfissao = StructType(
                              List(
                                StructField("id_profissao",IntegerType,true),
                                StructField("descricao",StringType,true)
                              )
                            )     
     val dfprof = spark.read
                       .format("csv")
                       .option("header","true")
                       .schema(schemaProfissao)
                       .load("C:\\dados\\arquivo_cidadao\\profissoes.csv")
  
     //join das pessoas com as profissoes
     val dfCidadaoProfissao = dffilter1.join(dfprof,Seq("id_profissao"),"inner")
    
     //profissionais no estado de sao paulo
     val qtdprofissionaissp = dfCidadaoProfissao.groupBy($"descricao").count;
     qtdprofissionaissp.collect().map(row => println(row))
   }

}



Em caso de dúvidas ou sugestões, escreva nos comentários ou nos mande um email: slothbigdata@gmail.com.

Veja também: https://slothbigdata.blogspot.com/2019/07/apache-spark-com-scala-manipulacao-do.html