tecnologia | big data | business intelligence | banco de dados

PySpark - Junções (JOIN)

Nesta publicação irei demonstrar como realizar junções (joins) utilizando a linguagem PySpark e fazer um paralelo com o SQL tradicional.

Cria o dataframe 1:
dadosDataframe1 = [
(1, "A")
, (2, "B")
, (3, "C")
, (4, "D")
, (5, "E")]
schemaDataframe1 = ["id", "coluna"]
df1 = spark.createDataFrame(data=dadosDataframe1, schema=schemaDataframe1)
df1.printSchema()
df1.show(truncate=False)

Cria o dataframe 2:
dadosDataframe2 = [
(1, "A")
, (2, "B")
, (3, "D")
, (4, "F")]
schemaDataframe2 = ["id", "coluna"]
df2 = spark.createDataFrame(data=dadosDataframe2, schema=schemaDataframe2)
df2.printSchema()
df2.show(truncate=False)


Inner Join (registros existentes em ambos dataframes):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='inner'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('INNER_JOIN')
)\
.show(10,False)



Left Join (todos os registros do dataframe 1):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='left'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('LEFT_JOIN')
)\
.show(10,False)



Right Join (todos os registros do dataframe 2):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='right'
)\
.orderBy(
df2.coluna.asc()
)\
.select(
df2.coluna.alias('RIGHT_JOIN')
)\
.show(10,False)



Full Join (todos os registros de ambos dataframes):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='full'
)\
.orderBy(
F.coalesce(df1.coluna, df2.coluna).asc()
)\
.select(
F.coalesce(df1.coluna, df2.coluna).alias('FULL_JOIN')
)\
.show(10,False)



Left Anti Join (registros exclusivos do dataframe 1):
df1\
.join(df2
, [df1.coluna == df2.coluna]
,how='left_anti'
)\
.orderBy(
df1.coluna.asc()
)\
.select(
df1.coluna.alias('LEFT_ANTI')
)\
.show(10,False)



Right Anti Join (registros exclusivos do dataframe 2):
df2\
.join(df1
, [df1.coluna == df2.coluna]
,how='left_anti'
)\
.orderBy(
df2.coluna.asc()
)\
.select(
df2.coluna.alias('RIGHT_ANTI')
)\
.show(10,False)



Full Join Exclusivo (registros exclusivos de cada dataframe):
df1\
.join(df2
, [df1.coluna == df2.coluna]
, how='full'
)\
.where(
(df1.coluna.isNull()) | (df2.coluna.isNull())
) \
.orderBy(
F.coalesce(df1.coluna, df2.coluna).asc()
)\
.select(
F.coalesce(df1.coluna, df2.coluna).alias('FULL_JOIN')
)\
.show(10,False)



Publicações relacionadas:
- PySpark - Criando dataframes;
- Banco de Dados - Junções (JOIN).
Data publicação: 21:37 18/07/2022

PySpark - Criando dataframes

Nesta publicação irei abordar algumas formas de criar dataframes utilizando PySpark.

Criar dataframe de formar manual especificando os dados e os nomes das colunas:

dadosAlunos = [
("A", "A1", 3, "A"),
("A", "A1", 4, None),
("A", "A3", 5, None),
("B", "B3", 1, None),
("B", "B2", 2, "B"),
("B", "B1", 3, None),
("C", "C2", 0, None),
("C", "C1", 2, None),
("D", "D1", 5, None),
("D", "D2", 8, None)
]
dadosAlunosColunas = ["nome", "departamento", "nota", "coluna"]
df = spark.createDataFrame(data=dadosAlunos, schema=dadosAlunosColunas)
df.printSchema()
df.show(truncate=False)


Criar dataframe a partir de um arquivo csv:

df = spark.read.csv(
path='/caminho/empresas/csv',
sep=';',
inferSchema=True,
header=True
)
df.printSchema()
df.show(truncate=False)


Criar dataframe a partir de arquivo(s) parquet:

df = spark.read.parquet(
path='/caminho/empresas/parquet'
)
df.printSchema()
df.show(truncate=False)
Data publicação: 23:21 05/06/2022

Cursos gratuitos (dados, desenvolvimento, Linux, ...)

Nesta publicação irei concentrar cursos gratuitos dos temas que geralmente são abordados por aqui. Sempre que possível, irei manter esta publicação atualizada.

*** Cursos em português:

- Python (Básico):
https://www.youtube.com/watch?v=S9uPNppGsGo&list=PLHz_AreHm4dlKP6QQCekuIPky1CiwmdI6

- Python Fundamentos para Análise de Dados, Big Data Fundamentos, Microsoft Power BI para Data Science, entre outros:
https://www.datascienceacademy.com.br/cursosgratuitos

- Git e GitHub, Linux, Banco de Dados, SQL, Python, Docker, TypeScript e muitos outros:
https://web.dio.me/


*** Cursos em inglês:

- Python:
https://www.youtube.com/watch?v=mRMmlo_Uqcs&list=PLIhvC56v63ILPDA2DQBv0IKzqsWTZxCkp

- Linux:
https://www.youtube.com/watch?v=VbEx7B_PTOE&list=PLIhvC56v63IJIujb5cyE13oLuyORZpdkL

- Bash Scripting:
https://www.youtube.com/watch?v=SPwyp2NG-bE&list=PLIhvC56v63IKioClkSNDjW7iz-6TFvLwS
Data publicação: 21:26 25/05/2022

PySpark - Sintaxes

Nesta publicação irei mencionar brevemente alguns comandos/sintaxes úteis do Spark utilizando a linguagem Python.


Dataframe.show():

Exibe as primeiras n linhas na tela.


Parâmetros:
n: números de linhas a exibir.
truncate: se informado True, irá cortar as strings com mais de 20 caracteres. Se um número maior que 1 for informado, strings maiores que o número informado serão cortadas e alinnhadas a direita.
vertical: se informado True, as linhas serão exibidas na vertical (uma linha por valor de coluna).


Exemplos:
df.show()
df.show(truncate=3)
df.show(vertical=True)
df.show(n=20, truncate=True, vertical=False)


Dataframe.select():

Retorna um novo dataframe de acordo com as expressões que foram determinadas.


Exemplos:
df.select('*').collect()
df.select('name', 'age').collect()
df.select(df.name, (df.age + 10).alias('age')).collect()


Dataframe.Filter():

Filtra linhas usando determinadas condições.
Obs: where() é um apelido do filter().


Exemplos:
df.filter(df.age > 3).collect()
df.where(df.age == 2).collect()
df.filter("age > 3").collect()
df.where("age = 2").collect()


Dataframe.distinct():

Retorna um novo dataframe contendo as linhas distintas no dataframe.


Exemplos:
df.distinct().count()


Dataframe.withColumn():

Retorna um novo dataframe adicionado a coluna ou renomeando a coluna que possui o mesmo nome.


Exemplo:
df.withColumn('age2', df.age + 2).collect()


Dataframe.printSchema():

Exibe a estrutura do dataframe no formato de árvore.


Sintaxe:
df.printSchema()


Dataframe.columns:

Retorna todas as colunas como uma lista.


Sintaxe:
df.columns


Dataframe.count():

Retorna o número de linhas do dataframe.


Sintaxe:
df.count()


Dataframe.describe():

Computa as estatísticas para colunas númericas e string.


Exemplos:
df.describe().show()
df.describe(['age']).show()


Dataframe.summary():

Computa statisticas especificas para colunas númericas e strings.


Exemplos:
df.summary().show()
df.summary("count", "min", "25%", "75%", "max").show()
df.select("age", "name").summary("count").show()


Dataframe.Collect():

Retorna todos os registros como uma lista de linha.


Sintaxe:
df.collect()


Dataframe.selectExpr():

Retorna um novo dataframe de acordo com as expressões SQL que foram determinadas.


Exemplo:
df.selectExpr("age * 2", "abs(age)").collect()


Dataframe.persist():

Determina o nível de armazenamento para persistir o conteúdo do dataframe durante as operações após ser computado pela primeira vez.


Sintaxe:
df.persist()


Dataframe.unpersist():

Marca o dataframe como não persistente, removendo todos os blocos da memória e disco.


Sintaxe:
df.unpersist(blocking=False)


Data publicação: 22:29 07/07/2021
Perfil
Olá jovem Padawan, seja bem vindo! Este site foi criado com o intuito de compartilhar um pouco de conhecimento de Tecnologia da Informação, Big Data, Banco de Dados e Business Intelligence.

GitHub  Linkedin  Youtube

"Eu não sei como vencer os outros; sei apenas como vencer a mim mesmo." (Yagu Munenori)


Leandro Sacramento, Todos os direitos reservados - 2012 - 2022