Pyspark 基础学习汇总篇🍎
一、AWS 架构
① AWS Glue:工作平台,包括脚本的编写以及管理脚本的运行状态以及调度等(主要:数据库配置、ETL和数据转换脚本编写、调度)
② Amazon S3 数据湖(数仓):数据的存储
③ Athena:(雅典娜)SQL直接编写查询工作台(会产生费用)
④ QucikSight:报表展示
二、QuickSight开户授权操作
点击用户头像,–管理QuickSight --邀请用户 --通过邮箱 --选择共享文件夹 --指定文件夹选择授权用户
三、什么是S aas
SaaS 行业电商、医疗、地产、物流,一般都会用到hubspot
从产品角度来看,Hubspot有三款产品,Marketing、CRM和Sales,共同实现Inbound Marketing(集客营销)全流程服务。
其中,Marketing是核心,提供SEO、社交媒体、网页制作及优化、网站评分等工具产品;CRM实现数据可视化,并自动追踪客户行为;Sales作为联系销售人员与客户的工具。
四、创建临时视图:
// 创建它的SparkSession对象终止前有效
df.createOrReplaceTempView("tempViewName")
// spark应用程序终止前有效
df.createOrReplaceGlobalTempView("tempViewName")
五、删除临时视图:
spark.catalog.dropTempView("tempViewName")
spark.catalog.dropGlobalTempView("tempViewName")
六、基础Demo
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
import datetime
from datetime import datetime, timedelta
from pyspark.sql.types import StringType
today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")
yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")
#连接的库名与表名
# read data from storage
contacts = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_test", transformation_ctx = "datasource0")
print('start')
df = contacts.toDF().select("contact_id","dt")
df.show(5)
print('end')
七、Spark2.3.0之后dynamic模式
Spark配置 spark.sql.sources.partitionOverwriteMode=STATIC //此参数默认为STATIC
此模式作用域插入数据,如果static模式时,会插入数据前删除其他分区,dynamic模式则不会删除其他分区,只会覆盖那些有数据写入的分区
spark.conf.set(“spark.sql.sources.partitionOverwriteMode”,“dynamic”)
八、join注意:
left right 这些join字段不会覆盖,outer inner 可以覆盖重复字段
🍌代码编写小知识一
1.过滤并去重
contact_df = contact_user_bind.toDF().filter(col('status')=='match').select('user_id','contact_id').dropDuplicates()
-- dropDuplicates()去重可以根据行或列,也可以保留重复数据,可以指定列去重
2.数据类型转换withColumn/select/selectExpr/sql
https://blog.csdn.net/nanfeizhenkuangou/article/details/121802837
3.增加一列+时间减法
product_export_df = product_export_df.withColumn('export_time_us', product_export_df.create_time - expr('INTERVAL 4 HOURS'))
4.增加一列+When判断
store_order_detail_df = store_order_detail_df.withColumn("servicebytrendsi", when((col("fulfillment_service") =='trendsi')|(col("fulfillment_service") =='trendsi-fashion-dropshipping'), 1).otherwise(0))
5.表连接(🌟)
store_order_merge = store_order_df.join(store_order_detail_df,store_order_df.id == store_order_detail_df.store_order_id,how='left')
#----------------
order_detail_total.join(sku_df.select('sku_id','color'),['sku_id'])
6.字符串拼接(字段拼接)
user_df = user_df.withColumn('user_name',concat(user_df.first_name,lit(" "),user_df.last_name))
7.设置日期格式
user_df = user_df.withColumn('sign_up_date', date_format(user_df.create_time-expr('INTERVAL 4 HOURS'),"MM/dd/yyyy"))
#------------------------
this_month = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m")
8.过滤掉含有空值的行
store_df = store_df.dropna(subset=['user_id'])
9.过滤某列中包含特定值的对象
order_df = order_df.filter(col('order_type').isin(1,3))
10.多列条件&处理
order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time_us')>'2021-04-01'))
11.when…then
order_df_detail = order_df_detail.withColumn('weight',when((col('weight')<=300)|(col('weight').isNull()),300).otherwise(col('weight')))
12.去空+单双引号
isNotNull()、’单引号标识本身不变 “代表字符串
13.多条件判断
write_dataframe = write_dataframe.withColumn(‘status’,when(col(‘deal_id’).isNull(),lit(“miss”)).when(col(‘user_type’).isNull(),lit(“extra”)).otherwise(‘match’))
14.日期函数
– timedelta(时间间隔) :miyaBirthdayNextday=miyaBirthday+datetime.timedelta(days=1)
– datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
– strftime(日期格式):print(datetime.date.today().strftime(‘%d/%m/%Y’))
– date_format(日期格式):contacts = contacts.withColumn(‘date’, date_format(col(‘create_time’),“yyyyMMdd”))
15.正则替换字符串函数
(27条消息) REGEXP_REPLACE的使用方法__JohnnyChu的博客-CSDN博客_regexp_replace
16.求差集函数subtract()
actual = result_mapping.select("user_id").subtract(result_mapping1.select('user_id')).dropDuplicates()
17.union/union all并集去重
write_dataframe = result_mapping1.union(result_mapping2).union(result_mapping3)
18.开窗函数✨
注意:导包from
pyspark.sql.window ``import
Window
write_dataframe = write_dataframe.withColumn(“rank”, row_number().over(Window.partitionBy(“contact_id”).orderBy(desc(“spu_sale_amount”))))
19.时间戳格式转换
create_time >= to_timestamp(‘2022-01-03 00:00:00’, ‘yyyy-mm-dd hh24:mi:ss’)
20.explode数组转列-数据切分
columns = [“Seqno”,“Name”]
data = [(“1”, “john jones”),
(“2”, “tracey smith”),
(“3”, “amy sanders”)]df = spark.createDataFrame(data=data,schema=columns)
df.withColumn(“split_name”,explode(split(col(“Name”)," "))).show()
🍑代码编写小知识二
21.数据类型转换
SELECT CONCAT(CAST(round((3/21)*100,3) AS CHAR),'%') as aa from act_canal;
22.通过日期获取周数
SELECT weekofyear('2022-06-13');
23.SparkSQL日期格式转换
SELECT DATE_FORMAT(NOW(),'yyyy-MM');
-----mysql-----
DATE_FORMAT(SYSDATE(),'%Y-%m-%d %H:%i:%s')
-----postgres-----
to_timestamp('2022-01-03 00:00:00', 'yyyy-mm-dd hh24:mi:ss')
SELECT TO_CHAR(CURRENT_DATE, 'yyyy-mm-dd');
24.日期获取第几周
SELECT weekofyear('2022-06-13')-23;
#----------------------------------
df_student.withColumn("week_of_month", date_format(col("birthday"), "W")).show()
25.groupby多条件分组
store_order_merge.groupBy(['user_id','year_month','year_last_month']).agg(sum(col('price')*col('num')).alias("total_pay_amount"))
26.多条件连表
write_dataframe = write_dataframe.join(trendsi_sales_china,on=['user_id','year_last_month','year_month'],how='outer')
#-----
df1.join(df2, on=[df1['age'] == df2['age'], df1['sex'] == df2['sex']], how='left_outer')
27.数据合并
df1.unionByName(df2).show()
28.日期减月份
store_order_df = store_order_df.withColumn('last_month_pay_time', col('pay_time') - expr('INTERVAL 1 MONTHS'))
29.union+union all+unionByName
df1.unionByName(df2, allowMissingColumns=True).show():允许拥有不同字段,null值填充
df1.unionByName(df2).show():此种情况根据字段进行合并(字段数一致)
union: 两个df合并,但是不按列名进行合并,而是位置,列名以前表为准(a.union(b) 列名顺序以a为准),会对重复数据进行去重
unionAll:同union方法(PySpark 中两者的行为都相同)
unionByName:合并时按照列名进行合并,而不是位置
30.本月的天数+本月当前第几天
start_day_of_next_month = (datetime.now() + relativedelta(months=1)).replace(day=1)
start_day_of_this_month = (datetime.now()).replace(day=1)
this_month_days=(start_day_of_next_month-start_day_of_this_month).days+1
this_month_days_now=(datetime.now() -start_day_of_this_month).days
31.计数、平均值、标准差、最小值和最大值
df = spark.createDataFrame(
[("Bob", 13, 40.3, 150.5), ("Alice", 12, 37.8, 142.3), ("Tom", 11, 44.1, 142.2)],
["name", "age", "weight", "height"],
)
df.describe(['age']).show()
32.删除重复行数据
df.dropDuplicates().show()
33.取两集合的交集
df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.intersectAll(df2).sort("C1", "C2").show()
34.UTF函数遍历
from pyspark.sql.functions import pandas_udf
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, df.schema).show()
35.多个DF合并数据(遍历UNION)
# 数据集必须字段一样,否则无法合并
buff = []
for pdfs in [d1, d2,d3]:
buff.append(pdfs)
mergeDF = reduce(lambda x,y: x.union(y), buff)
mergeDF.show()
36.获取当前下周,下月时间
days7_str = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y-%m-%d")
days7_int = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y%m%d")
days14_str = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y-%m-%d")
days14_int = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y%m%d")
days30_str = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y-%m-%d")
days30_int = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y%m%d")
37.pyspark开窗函数over()
#定义分区和排序
window = Window.partitionBy(['spu_id']).orderBy(['spu_co'])
#使用rank排序,过滤rank=1,并删除rank列
sku_df = sku_df.withColumn('rank', rank().over(window)).filter("rank= '1'").drop('rank')
38.concat+weekofyear()
t_order_df = t_order_df.withColumn('year_week',concat(concat(year(col('pay_time')),lit('-')),weekofyear(col('pay_time'))))
39.数据类型转换cast()+日期周格式
china_order_df_week = china_order_df.filter(col('year_month_week')== concat(lit(this_month),lit('-'),lit(weekofyear(current_date()).cast(StringType())),lit("W")))
🍉代码编写小知识三
40.pyspark处理数组数据
from pyspark.sql.functions import explode_outer
""" with array """
df.select(df.name,explode_outer(df.knownLanguages)).show()
""" with map """
df.select(df.name,explode_outer(df.properties)).show()
41.空值判断
字段.isNotNull()-------------字段.isNull()
42.Get String length
from pyspark.sql.functions import length
df_books.where(length(col("book_name")) >= 20).show()
43.row to col✨ 行转列
From pyspark.sql import functions as F
keys=['is_jit','after_reason_type','supplier_id']
column='after_reason'
Column_value='value'
Column_value_list=['']
df.groupby(keys).pivot(column, column_value_list).agg(F.first(column_value),ignore nulls=True)).fillna('*')
44.读取制定分区数据✨
# dw_order_cut
partitionstate = "dt >='2021-01-01'"
#partitionstate = "dt >= '"+days30_int+"'"
partitionstate1 = "dt = '"+yesterday_int+"'"
partitionstate2 = "time_us_hour = '0'"
partitionstate3 = "dt = '"+cday_int+"'"
source_t_sku = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_sku", push_down_predicate = partitionstate1)
45.日期-七天-30天
days7_str = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y-%m-%d")
days7_int = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y%m%d")
day30_str = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y-%m-%d")
day30_int = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y%m%d")
46.数据类型to_string()
df = ps.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]}, columns=['col1', 'col2'])
print(df.to_string())
col1 col2
0 1 4
1 2 5
2 3 6
47.字符串截取substr✨substring()
b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show()
参考链接:https://zhuanlan.zhihu.com/p/34901943
df.withColumn('year', substring('date', 1,4))\
.withColumn('month', substring('date', 5,2))\
.withColumn('day', substring('date', 7,2))
48.UDF函数yyyyMMdd–yyyy-MM-dd✨
from datetime import datetime
from pyspark.sql.functions import col,udf
from pyspark.sql.types import DateType
rdd = sc.parallelize(['20161231', '20140102', '20151201', '20161124'])
df1 = sqlContext.createDataFrame(rdd, ['old_col'])
# UDF to convert string to date
func = udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
df = df1.withColumn('new_col', date_format(func(col('old_col')), 'MM-dd-yyy'))
df.show()
write_dataframe = write_dataframe.withColumn('date', to_timestamp('dt', 'yyyyMMdd'))
49.dayofweek()
#dayofweek:星期天~星期六==== 1~7 (星期天是1,星期一是2 ......)
若改成国人习惯的日期,则需要自行转换,下面给出demo (scala中用when else 来判断)
//新增days_of_week 当周第几天(按照国人习惯,周一为第一天)
.withColumn("days_of_week", when(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) === 1, 7)
.otherwise(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) -1)
.cast(LongType))
50.concat字符串拼接
user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
相似函数:concat_ws()
51.countDistinct统计去重
user_tag.groupby(['iso_week_agg']).agg(countDistinct(col('user_id')).alias('user_reg_number'))
52.处理跨年周weekday()
# 获取数据源
user_tag = source_user_tag.toDF().select('user_id','create_time')
user_tag.show(20)
# get source + etl + year-week 处理跨年周
iso_weekday = when(dayofweek('create_time') != 1, dayofweek('create_time')-1).otherwise(7)
week_from_prev_year = (month('create_time') == 1) & (weekofyear('create_time') > 9)
week_from_next_year = (month('create_time') == 12) & (weekofyear('create_time') == 1)
iso_year = when(week_from_prev_year, year('create_time') - 1) \
.when(week_from_next_year, year('create_time') + 1) \
.otherwise(year('create_time'))
user_tag = user_tag.withColumn('iso_year', iso_year)
user_tag = user_tag.withColumn('iso_week', lpad(weekofyear('create_time'), 3, "W0"))
user_tag = user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
user_tag.show(20)
53.collect_list 与 collect_set
pyspark 系列 - collect_list 與 collect_set 實例教學 | Happy Coding Lab (chilunhuang.github.io)
Collect_list :将数据根据groupby的主字段进行合并成数组,collect_set:会进行去重
54.Lpad & Rpad
#### Add both leading and Trailing space
# 指定col的长度,并指定字符补齐长度
df_states = df_states.withColumn('states_Name_new', lpad(df_states.state_name,20, '#'))
df_states = df_states.withColumn('states_Name_new', rpad(df_states.states_Name_new,24, '#'))
df_states.show()
55.replace
Python pyspark.sql.DataFrame.replace用法及代码示例 - 纯净天空 (vimsky.com)文章来源:https://www.toymoban.com/news/detail-705612.html
56.✨同期群数据分析-- Day(留存率)
# 同期群分析--Day
# 1.查找每个客户首次购买的时间
t_order = source_t_order.toDF().select('user_id','create_time')
t_order = t_order.withColumn('create_time',date_format(col('create_time'),"yyyy-MM-dd")).filter(col('create_time')>'2022-07-01')
t_order.show(20)
from pyspark.sql.window import Window
# with min(create_time)
t_order = t_order.withColumn('FirstPurchaseDate',min('create_time').over(Window.partitionBy('user_id').orderBy(col('create_time'))))
t_order.show(20)
# 2.查找重复周期(下次下单时间间隔 )
t_order = t_order.withColumn('ReturnDays',datediff('create_time','FirstPurchaseDate'))
t_order.show(20)
# 3.计算具有相同首次日期和重复次数的客户
t_order = t_order.groupby(['FirstPurchaseDate','ReturnDays']).agg(count('user_id').alias('count_user_id'))
t_order.show(20)
# 4.获取队列矩阵
t_order = t_order.groupby('FirstPurchaseDate').pivot('ReturnDays').agg(first('count_user_id')).orderBy('FirstPurchaseDate')
t_order.show(20)
57.两个日期减
# 获取第几天分区
dt=date_format(
date_add('Day',-8,current_timestamp),'%Y%m%d')
# Athen + Glue + MySQL
date_diff(create_time,delivery_time)<=7 or date_diff('day',create_time,delivery_time)<=7
#pgsql
date_part('day',create_time-delivery_time)<=7
# pgsql 日期加减
to_char(delivery_time + INTERVAL '7 day'-INTERVAL '4 hour','yyyy-mm-dd')='2022-07-18'
# Athena 日期加减
SELECT date_add('day', 1, timestamp '2022-11-30 10:30:00') AS new_date;
SELECT date_format(date_parse('20230721', '%Y%m%d') - INTERVAL '20' DAY, '%Y%m%d') AS new_date;
58.字符串截取SUBSTR
b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show(): 第一个字符截取三个
59.over()开窗 rank()/dense_rank()/row_number()
# over(partition by order by)
from pyspark.sql.window import Window
import pyspark.sql.functions as F
windowSpec = Window.partitionBy("iso_week_agg").orderBy(F.asc("pay_iso_week_agg"))
order_df = order_df.withColumn("dense_rank", F.dense_rank().over(windowSpec))
order_df.show(50)
🍇代码编写小知识一
60.add column null
test = test.withColumn('order_id',lit(None))
61.明细写入数据库命令
t_sku_predict_7days.write.format('jdbc').options(url='jdbc:mysql://prod-trendsi-bi-mysql.cluster-cctv6bkiavlh.us-west-1.rds.amazonaws.com:3306/DW',driver='com.mysql.jdbc.Driver',dbtable='lkb_test_01',user='admin',password='8S5u0tWH2vQCz9bbpn2B').mode('overwrite').save()
62.常用pyspark函数 ceil …
https://blog.csdn.net/weixin_39998521/article/details/110598705
63.get_json_object
#transform sku data
sku_df = source_t_sku.toDF().filter(col('deleted')==0).select(col('id').alias('sku_id'),'sku_code',col('spu_id').alias('spu_id'),'cogs_price','image',col('dimension_values').alias('property'),'barcode','third_scan_code')
sku_df = sku_df.withColumn('color', get_json_object(col("property"),"$.Color").alias("color"))
64.pgsql 日期格式转化
to_char(date,varchar) select to_char(current_date - 1,'YYYY-MM-dd')
to_date('2012-05-01 23:59:59','yyyy-mm-dd hh24:mi:ss') char ,char
to_timestamp(char,char)
select cast(to_char(current_date - 1,'YYYY-MM-dd') AS DATE)
cloudquery
select
pay_time,spu_id,sum(num)
from
lkb_test_skc1 where spu_id=27490 and date_format(pay_time,'%Y-%m-%d')<date'2022-09-08' and date_format(pay_time,'%Y-%m-%d') >date'2021-04-01' group by pay_time,spu_id
65.split切分字段
sku_df = sku_df.withColumn("spu_co", split(col("barcode"), "-").getItem(0))
66.Pyspark 之 SparkSQL Demo(读S3并更新Athena映射元信息)
#此方式作废,直接使用AWS GLue 的爬网程序Craler可以直接实现
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
import pymysql
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
import datetime
from datetime import datetime, timedelta
today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")
yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")
''' --- 读分区表---
# 读取 parquet 文件
PATH = "s3://xxxxxxx/xxxxx/xxxxxx/"
df = spark.read.parquet(PATH +"dt="+yesterday_int+"/*")
df = df.withColumn('dt',lit(yesterday_int))
# 展示 DataFrame
df.show(20)
print('begin')
DataSink5 = glueContext.getSink(
path = PATH,
connection_type = "s3",
updateBehavior = "UPDATE_IN_DATABASE",
partitionKeys = ["dt"],
enableUpdateCatalog = True,
transformation_ctx = "DataSink5")
DataSink5.setCatalogInfo(
catalogDatabase = "ods",
catalogTableName = "ods_t_sku")
from awsglue.dynamicframe import DynamicFrame
NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
DataSink5.setFormat("glueparquet")
DataSink5.writeFrame(NewDynamicFrame)
print('end')
'''
# --- 不分区 ----
# 读取 parquet 文件
PATH = "s3://xxxxxx/xxx/xx/xxxxxxx/"
df = spark.read.parquet(PATH +"*")
# 展示 DataFrame
df.show(20)
print('begin')
DataSink5 = glueContext.getSink(
path = PATH,
connection_type = "s3",
updateBehavior = "UPDATE_IN_DATABASE",
enableUpdateCatalog = True,
transformation_ctx = "DataSink5")
DataSink5.setCatalogInfo(
catalogDatabase = "ods",
catalogTableName = "ods_t_carton")
from awsglue.dynamicframe import DynamicFrame
NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
DataSink5.setFormat("glueparquet")
DataSink5.writeFrame(NewDynamicFrame)
print('end')
# query = '''
# SELECT *
# FROM hello.xxxxxx
# LIMIT 5;
# '''
#
# result = spark.sql(query)
# result.show()
67.获取字段数据类型
#Get datatype of birthday
columndf_student.select(``"birthday"``).dtypes
68.数据类型转换dtypes(改变col类型)
df = df.withColumn("kills",df.kills.astype("int"))
df.select("kills").dtypes
69.lag & lead
https://zhuanlan.zhihu.com/p/202969159
70.实现对列的累积求和
https://blog.csdn.net/XnCSD/article/details/90676259
71.Python通知脚本
import sys
import smtplib
import datetime
from email.mime.text import MIMEText
# 邮件设置
mail_host = "smtp.163.com" # 邮箱服务器地址
mail_port = 465 # 邮箱服务器端口
mail_user = "xxxxxxxx@163.com"
mail_pass = "xxxxxxxxxx"
mail_to = ["xxxxxxxx@qq.com","xxxxxxx@qq.com"] # 多个收件人使用列表
# 邮件内容
now = datetime.datetime.now()
now = now + datetime.timedelta(hours=8) # 将当前时间增加8小时= cn time
print(now)
if now.hour >= 0 and now.hour < 2:
msg = MIMEText("xxxxxxxx") # 邮件正文
elif now.hour >= 10 and now.hour < 12:
msg = MIMEText("xxxxxxx") # 邮件正文
else:
msg = MIMEText("xxxxxxx") # 邮件正文
# 邮件正文
msg['From'] = mail_user
msg['To'] = ",".join(mail_to) # 将多个收件人邮箱地址用逗号连接
msg['Subject'] = "随便"
# 发送邮件
smtp = smtplib.SMTP_SSL(mail_host, mail_port)
smtp.login(mail_user, mail_pass)
smtp.sendmail(mail_user, mail_to, msg.as_string())
smtp.quit()
print('success')
72.dropna删除含有null的行数据
https://blog.csdn.net/qq_39954916/article/details/120584754
73.quicksight-string-date🪐
parseDate(concat(substring({dt},1,4),'-',substring({dt},5,2),'-',substring({dt},7,2)),'yyyy-MM-dd')
74.Pyspark解析URL
t_traffic_detail = t_traffic_detail.withColumn('utm_campaign' ,expr("parse_url(url,'QUERY', 'utm_campaign')"))
75.pyspark 按条件读取Mysql表部分数据,提高效率✨
print('start')
#store_product = glueContext.create_dynamic_frame.from_catalog(database = "hello", table_name = "trendsi_public_t_store_product", transformation_ctx = "datasource0")
query= "(select * from t_test where update_time >= (date(now()) - interval '1 day' +interval '4 hour') and update_time <(date(now()) +interval '4 hour')) as testresult"
store_product = spark.read.format("jdbc").option("url", "jdbc:postgresql:xxxxxxxxxx").option("driver", "org.postgresql.Driver").option("dbtable", query).option("user", "xxxxxx").option("password", "xxxxxxxxxx").load()
print('success')
76.Athena-explode(unnest)🪐
-- 1 可供直接测试使用
WITH dataset AS (
SELECT
'engineering' as department,
ARRAY['Sharon', 'John', 'Bob', 'Sally'] as users
)
SELECT department, names FROM dataset
CROSS JOIN UNNEST(users) as t(names)
-- 2.实例演示
select user_id,source from
(select user_id,split(custom_source,';') cs from hive_dw_user_new_tag where length(custom_source)>12 limit 20)
cross join unnest(cs) as t(source)
77.intersection交集
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['A','C','D'])
z = x.intersection(y)
print(x.collect())
print(y.collect())
print(z.collect())
78.数组中取单个元素
#增加图片
wms_abnormal = source_wms_abnormal.toDF().select('id','image')
wms_abnormal = wms_abnormal.withColumn('image', regexp_replace('image', '([\[\]"]+)', ''))
wms_abnormal = wms_abnormal.withColumn("split_image",split(col("image"),","))
wms_abnormal = wms_abnormal.withColumn('image1', col('split_image').getItem(0))
wms_abnormal = wms_abnormal.withColumn('image2', col('split_image').getItem(1))
wms_abnormal = wms_abnormal.withColumn('image3', col('split_image').getItem(2))
wms_abnormal = wms_abnormal.drop('split_image')
79.重复记录归一行
from pyspark.sql.functions import collect_list
shopify_store_url = left_join.groupby('user_id').agg(collect_list('domain').alias("shopify_store_url"))
shopify_store_url = shopify_store_url.withColumn("shopify_store_url",concat_ws(",",col("shopify_store_url")))
🧆代码编写小知识四
80.Glue read S3 to DataFrame🪐
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import *
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "prod-bi", table_name = "biods_t_user", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
import datetime
from datetime import datetime, timedelta
from pyspark.sql.types import StringType
recover_day =datetime.now()- timedelta(hours=4+24*0, minutes=0)
today_str = (recover_day).strftime("%Y-%m-%d")
today_int = (recover_day).strftime("%Y%m%d")
yesterday_str = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y%m%d")
print('start')
from pyspark.sql import SparkSession
df = spark.read.options(header='True', inferSchema='True', delimiter='&') \
.csv("s3://xxxxxx/xx/xxx-emp")
df.printSchema()
df.show(20)
81.Cast to DecimalType
order_df = order_df.withColumn('us_delivery_cost',when((col('package_weight').isNull())&(col('logistics_company_id').isin(1132,25)),lit(10.95).cast(DecimalType(10,2))).otherwise(col('us_delivery_cost')))
82.Athena url处理函数✨
select url_extract_parameter(url,'curPage') from traffic.xxxxxx
83.Athena 提取数组元素函数✨
select case when cardinality(tmp)=3 then concat(tmp[1],tmp[2],tmp[3])
when cardinality(tmp)=2 then concat(tmp[1],tmp[2])
when cardinality(tmp)=1 then tmp[1] end category123 from
(select split(event_value, '/') tmp from traffic.xxxxx where event_target='page' and event_type='imp' and page_code='product_listing_page' limit 20) a
84.pyspark逻辑取反
order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time')>'2021-04-01')&(col('pay_time')<today_str))
85.创建临时DataFrame
from pyspark.sql.functions import last_day
df = spark.createDataFrame([('1997-02-10',)], ['d'])
df.select(last_day(df.d).alias('date')).show()
86、QS数据时间限制(通过SQL)
select distinct * from report_inventory_kpi where (dt >= date_format(add_months(current_timestamp,-2),'yyyyMMdd'))
87、关于pyspark中weekofyear
pyspark3.1.0之后weekofyear默认周日至周六,之前默认是周一周日,如果需要制定周一是一周的开始,则firstDayOfWeek=2
from pyspark.sql.functions import weekofyear
df.select(weekofyear('date', firstDayOfWeek=2)) # 显示指定 firstDayOfWeek 参数为 2
88、Athena获取ISO的年-周
select concat(date_format(date_add('day', -1 ,date '2023-01-03'), '%x-%v'),'W')
持续更新中…🎈文章来源地址https://www.toymoban.com/news/detail-705612.html
到了这里,关于AWS Glue Pyspark+Athena基础学习汇总的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!