AWS Glue Pyspark+Athena基础学习汇总

这篇具有很好参考价值的文章主要介绍了AWS Glue Pyspark+Athena基础学习汇总。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Pyspark 基础学习汇总篇🍎


一、AWS 架构

① AWS Glue:工作平台,包括脚本的编写以及管理脚本的运行状态以及调度等(主要:数据库配置、ETL和数据转换脚本编写、调度)

② Amazon S3 数据湖(数仓):数据的存储

③ Athena:(雅典娜)SQL直接编写查询工作台(会产生费用)

④ QucikSight:报表展示


二、QuickSight开户授权操作

点击用户头像,–管理QuickSight --邀请用户 --通过邮箱 --选择共享文件夹 --指定文件夹选择授权用户


三、什么是S aas

SaaS 行业电商、医疗、地产、物流,一般都会用到hubspot

从产品角度来看,Hubspot有三款产品,Marketing、CRM和Sales,共同实现Inbound Marketing(集客营销)全流程服务。

其中,Marketing是核心,提供SEO、社交媒体、网页制作及优化、网站评分等工具产品;CRM实现数据可视化,并自动追踪客户行为;Sales作为联系销售人员与客户的工具。


四、创建临时视图:
// 创建它的SparkSession对象终止前有效
df.createOrReplaceTempView("tempViewName")  

// spark应用程序终止前有效
df.createOrReplaceGlobalTempView("tempViewName")  
五、删除临时视图:
spark.catalog.dropTempView("tempViewName")
spark.catalog.dropGlobalTempView("tempViewName")

六、基础Demo
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []


import datetime
from datetime import datetime, timedelta
from pyspark.sql.types import StringType

today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")


yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")

#连接的库名与表名
# read data from storage
contacts = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_test", transformation_ctx = "datasource0")

print('start')
df = contacts.toDF().select("contact_id","dt")
df.show(5)
print('end')

七、Spark2.3.0之后dynamic模式

Spark配置 spark.sql.sources.partitionOverwriteMode=STATIC //此参数默认为STATIC

此模式作用域插入数据,如果static模式时,会插入数据前删除其他分区,dynamic模式则不会删除其他分区,只会覆盖那些有数据写入的分区

spark.conf.set(“spark.sql.sources.partitionOverwriteMode”,“dynamic”)

八、join注意:

left right 这些join字段不会覆盖,outer inner 可以覆盖重复字段


🍌代码编写小知识一

1.过滤并去重
contact_df = contact_user_bind.toDF().filter(col('status')=='match').select('user_id','contact_id').dropDuplicates()
-- dropDuplicates()去重可以根据行或列,也可以保留重复数据,可以指定列去重

2.数据类型转换withColumn/select/selectExpr/sql
https://blog.csdn.net/nanfeizhenkuangou/article/details/121802837

3.增加一列+时间减法
product_export_df = product_export_df.withColumn('export_time_us', product_export_df.create_time - expr('INTERVAL 4 HOURS'))

4.增加一列+When判断
store_order_detail_df = store_order_detail_df.withColumn("servicebytrendsi", when((col("fulfillment_service") =='trendsi')|(col("fulfillment_service") =='trendsi-fashion-dropshipping'), 1).otherwise(0))

5.表连接(🌟)
store_order_merge  = store_order_df.join(store_order_detail_df,store_order_df.id == store_order_detail_df.store_order_id,how='left')
#----------------
order_detail_total.join(sku_df.select('sku_id','color'),['sku_id'])

6.字符串拼接(字段拼接)
user_df = user_df.withColumn('user_name',concat(user_df.first_name,lit(" "),user_df.last_name))

7.设置日期格式
user_df = user_df.withColumn('sign_up_date', date_format(user_df.create_time-expr('INTERVAL 4 HOURS'),"MM/dd/yyyy"))
#------------------------
this_month  = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m")

8.过滤掉含有空值的行
store_df = store_df.dropna(subset=['user_id'])

9.过滤某列中包含特定值的对象
order_df = order_df.filter(col('order_type').isin(1,3))

10.多列条件&处理
order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time_us')>'2021-04-01'))

11.when…then
order_df_detail = order_df_detail.withColumn('weight',when((col('weight')<=300)|(col('weight').isNull()),300).otherwise(col('weight')))

12.去空+单双引号
isNotNull()、’单引号标识本身不变 “代表字符串

13.多条件判断

write_dataframe = write_dataframe.withColumn(‘status’,when(col(‘deal_id’).isNull(),lit(“miss”)).when(col(‘user_type’).isNull(),lit(“extra”)).otherwise(‘match’))


14.日期函数

– timedelta(时间间隔) :miyaBirthdayNextday=miyaBirthday+datetime.timedelta(days=1)

– datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

– strftime(日期格式):print(datetime.date.today().strftime(‘%d/%m/%Y’))

– date_format(日期格式):contacts = contacts.withColumn(‘date’, date_format(col(‘create_time’),“yyyyMMdd”))


15.正则替换字符串函数

(27条消息) REGEXP_REPLACE的使用方法__JohnnyChu的博客-CSDN博客_regexp_replace


16.求差集函数subtract()
actual = result_mapping.select("user_id").subtract(result_mapping1.select('user_id')).dropDuplicates()

17.union/union all并集去重

write_dataframe = result_mapping1.union(result_mapping2).union(result_mapping3)


18.开窗函数✨

注意:导包from pyspark.sql.window ``import Window

write_dataframe = write_dataframe.withColumn(“rank”, row_number().over(Window.partitionBy(“contact_id”).orderBy(desc(“spu_sale_amount”))))


19.时间戳格式转换

create_time >= to_timestamp(‘2022-01-03 00:00:00’, ‘yyyy-mm-dd hh24:mi:ss’)


20.explode数组转列-数据切分

columns = [“Seqno”,“Name”]
data = [(“1”, “john jones”),
(“2”, “tracey smith”),
(“3”, “amy sanders”)]

df = spark.createDataFrame(data=data,schema=columns)
df.withColumn(“split_name”,explode(split(col(“Name”)," "))).show()


🍑代码编写小知识二

21.数据类型转换
SELECT CONCAT(CAST(round((3/21)*100,3) AS CHAR),'%') as aa from act_canal; 

22.通过日期获取周数
SELECT weekofyear('2022-06-13');

23.SparkSQL日期格式转换
 SELECT DATE_FORMAT(NOW(),'yyyy-MM');
 -----mysql-----
 DATE_FORMAT(SYSDATE(),'%Y-%m-%d %H:%i:%s')
 -----postgres-----
 to_timestamp('2022-01-03 00:00:00', 'yyyy-mm-dd hh24:mi:ss')
 
SELECT TO_CHAR(CURRENT_DATE, 'yyyy-mm-dd');

24.日期获取第几周
SELECT weekofyear('2022-06-13')-23;
#----------------------------------
df_student.withColumn("week_of_month", date_format(col("birthday"), "W")).show()

25.groupby多条件分组
store_order_merge.groupBy(['user_id','year_month','year_last_month']).agg(sum(col('price')*col('num')).alias("total_pay_amount"))

26.多条件连表
write_dataframe = write_dataframe.join(trendsi_sales_china,on=['user_id','year_last_month','year_month'],how='outer')
#-----
df1.join(df2, on=[df1['age'] == df2['age'], df1['sex'] == df2['sex']], how='left_outer')

27.数据合并
df1.unionByName(df2).show()

28.日期减月份
store_order_df = store_order_df.withColumn('last_month_pay_time', col('pay_time') - expr('INTERVAL 1 MONTHS'))

29.union+union all+unionByName
df1.unionByName(df2, allowMissingColumns=True).show():允许拥有不同字段,null值填充
df1.unionByName(df2).show():此种情况根据字段进行合并(字段数一致)

union: 两个df合并,但是不按列名进行合并,而是位置,列名以前表为准(a.union(b) 列名顺序以a为准),会对重复数据进行去重
unionAll:同union方法(PySpark 中两者的行为都相同)
unionByName:合并时按照列名进行合并,而不是位置

30.本月的天数+本月当前第几天
start_day_of_next_month = (datetime.now() + relativedelta(months=1)).replace(day=1)
start_day_of_this_month = (datetime.now()).replace(day=1)

this_month_days=(start_day_of_next_month-start_day_of_this_month).days+1
this_month_days_now=(datetime.now() -start_day_of_this_month).days

31.计数、平均值、标准差、最小值和最大值
df = spark.createDataFrame(
    [("Bob", 13, 40.3, 150.5), ("Alice", 12, 37.8, 142.3), ("Tom", 11, 44.1, 142.2)],
    ["name", "age", "weight", "height"],
)
df.describe(['age']).show()

32.删除重复行数据
df.dropDuplicates().show()

33.取两集合的交集
df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.intersectAll(df2).sort("C1", "C2").show()

34.UTF函数遍历
from pyspark.sql.functions import pandas_udf
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, df.schema).show()  

35.多个DF合并数据(遍历UNION)
# 数据集必须字段一样,否则无法合并
buff = []
for pdfs in [d1, d2,d3]:
      buff.append(pdfs)
mergeDF = reduce(lambda x,y: x.union(y), buff)
mergeDF.show()

36.获取当前下周,下月时间
days7_str = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y-%m-%d")
days7_int = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y%m%d")

days14_str = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y-%m-%d")
days14_int = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y%m%d")

days30_str = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y-%m-%d")
days30_int = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y%m%d")

37.pyspark开窗函数over()
#定义分区和排序
window = Window.partitionBy(['spu_id']).orderBy(['spu_co'])
#使用rank排序,过滤rank=1,并删除rank列
sku_df = sku_df.withColumn('rank', rank().over(window)).filter("rank= '1'").drop('rank')

38.concat+weekofyear()
t_order_df = t_order_df.withColumn('year_week',concat(concat(year(col('pay_time')),lit('-')),weekofyear(col('pay_time'))))

39.数据类型转换cast()+日期周格式
china_order_df_week = china_order_df.filter(col('year_month_week')== concat(lit(this_month),lit('-'),lit(weekofyear(current_date()).cast(StringType())),lit("W")))

🍉代码编写小知识三

40.pyspark处理数组数据
from pyspark.sql.functions import explode_outer
""" with array """
df.select(df.name,explode_outer(df.knownLanguages)).show()
""" with map """
df.select(df.name,explode_outer(df.properties)).show()

41.空值判断
字段.isNotNull()-------------字段.isNull()

42.Get String length
from pyspark.sql.functions import length
df_books.where(length(col("book_name")) >= 20).show()

43.row to col✨ 行转列
From pyspark.sql import functions as F
keys=['is_jit','after_reason_type','supplier_id']
column='after_reason'
Column_value='value'
Column_value_list=['']
df.groupby(keys).pivot(column, column_value_list).agg(F.first(column_value),ignore nulls=True)).fillna('*')

44.读取制定分区数据✨
# dw_order_cut
partitionstate = "dt >='2021-01-01'"
#partitionstate = "dt >= '"+days30_int+"'"
partitionstate1 = "dt = '"+yesterday_int+"'"
partitionstate2 = "time_us_hour = '0'"
partitionstate3 = "dt = '"+cday_int+"'"

source_t_sku = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_sku", push_down_predicate = partitionstate1)

45.日期-七天-30天
days7_str = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y-%m-%d")
days7_int = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y%m%d")

day30_str = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y-%m-%d")
day30_int = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y%m%d")

46.数据类型to_string()
df = ps.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]}, columns=['col1', 'col2'])
print(df.to_string())
   col1  col2
0     1     4
1     2     5
2     3     6

47.字符串截取substr✨substring()
b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show()
参考链接:https://zhuanlan.zhihu.com/p/34901943
df.withColumn('year', substring('date'14))\
  .withColumn('month', substring('date'52))\
  .withColumn('day', substring('date'72))

48.UDF函数yyyyMMdd–yyyy-MM-dd✨
from datetime import datetime
from pyspark.sql.functions import col,udf
from pyspark.sql.types import DateType


rdd = sc.parallelize(['20161231', '20140102', '20151201', '20161124'])
df1 = sqlContext.createDataFrame(rdd, ['old_col'])

# UDF to convert string to date
func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())

df = df1.withColumn('new_col', date_format(func(col('old_col')), 'MM-dd-yyy'))

df.show()

write_dataframe = write_dataframe.withColumn('date', to_timestamp('dt', 'yyyyMMdd'))

49.dayofweek()
#dayofweek:星期天~星期六==== 1~7  (星期天是1,星期一是2	......)
若改成国人习惯的日期,则需要自行转换,下面给出demo (scala中用when else 来判断)
//新增days_of_week 当周第几天(按照国人习惯,周一为第一天)
.withColumn("days_of_week", when(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) === 1, 7)
.otherwise(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) -1)
.cast(LongType))

50.concat字符串拼接
user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
相似函数:concat_ws()

51.countDistinct统计去重
user_tag.groupby(['iso_week_agg']).agg(countDistinct(col('user_id')).alias('user_reg_number'))

52.处理跨年周weekday()
# 获取数据源
user_tag = source_user_tag.toDF().select('user_id','create_time')
user_tag.show(20)
# get source + etl + year-week 处理跨年周 
iso_weekday = when(dayofweek('create_time') != 1, dayofweek('create_time')-1).otherwise(7)
week_from_prev_year = (month('create_time') == 1) & (weekofyear('create_time') > 9)
week_from_next_year = (month('create_time') == 12) & (weekofyear('create_time') == 1)
iso_year = when(week_from_prev_year, year('create_time') - 1) \
            .when(week_from_next_year, year('create_time') + 1) \
            .otherwise(year('create_time'))
        
user_tag = user_tag.withColumn('iso_year', iso_year)
user_tag = user_tag.withColumn('iso_week', lpad(weekofyear('create_time'), 3, "W0"))
user_tag = user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
user_tag.show(20)

53.collect_list 与 collect_set

pyspark 系列 - collect_list 與 collect_set 實例教學 | Happy Coding Lab (chilunhuang.github.io)

Collect_list :将数据根据groupby的主字段进行合并成数组,collect_set:会进行去重


54.Lpad & Rpad
#### Add both leading and Trailing space
# 指定col的长度,并指定字符补齐长度
df_states = df_states.withColumn('states_Name_new', lpad(df_states.state_name,20, '#'))
df_states = df_states.withColumn('states_Name_new', rpad(df_states.states_Name_new,24, '#'))

df_states.show()

55.replace

Python pyspark.sql.DataFrame.replace用法及代码示例 - 纯净天空 (vimsky.com)


56.✨同期群数据分析-- Day(留存率)
# 同期群分析--Day

# 1.查找每个客户首次购买的时间
t_order = source_t_order.toDF().select('user_id','create_time')
t_order = t_order.withColumn('create_time',date_format(col('create_time'),"yyyy-MM-dd")).filter(col('create_time')>'2022-07-01')
t_order.show(20)

from pyspark.sql.window import Window
# with min(create_time)
t_order = t_order.withColumn('FirstPurchaseDate',min('create_time').over(Window.partitionBy('user_id').orderBy(col('create_time'))))
t_order.show(20)

# 2.查找重复周期(下次下单时间间隔 )
t_order =  t_order.withColumn('ReturnDays',datediff('create_time','FirstPurchaseDate'))
t_order.show(20)

# 3.计算具有相同首次日期和重复次数的客户
t_order = t_order.groupby(['FirstPurchaseDate','ReturnDays']).agg(count('user_id').alias('count_user_id'))
t_order.show(20)

# 4.获取队列矩阵
t_order = t_order.groupby('FirstPurchaseDate').pivot('ReturnDays').agg(first('count_user_id')).orderBy('FirstPurchaseDate')
t_order.show(20)

57.两个日期减
# 获取第几天分区
dt=date_format(
date_add('Day'-8current_timestamp),'%Y%m%d'# Athen + Glue + MySQL
date_diff(create_time,delivery_time)<=7 or date_diff('day',create_time,delivery_time)<=7

#pgsql
date_part('day',create_time-delivery_time)<=7

# pgsql 日期加减
to_char(delivery_time + INTERVAL '7 day'-INTERVAL '4 hour','yyyy-mm-dd')='2022-07-18'

# Athena 日期加减
SELECT date_add('day', 1, timestamp '2022-11-30 10:30:00') AS new_date;
SELECT date_format(date_parse('20230721', '%Y%m%d') - INTERVAL '20' DAY, '%Y%m%d') AS new_date;



58.字符串截取SUBSTR
b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show(): 第一个字符截取三个

59.over()开窗 rank()/dense_rank()/row_number()
# over(partition  by  order  by) 
from pyspark.sql.window import Window
import pyspark.sql.functions as F

windowSpec  = Window.partitionBy("iso_week_agg").orderBy(F.asc("pay_iso_week_agg"))
order_df = order_df.withColumn("dense_rank", F.dense_rank().over(windowSpec))
order_df.show(50)

🍇代码编写小知识一

60.add column null
test = test.withColumn('order_id',lit(None))

61.明细写入数据库命令
t_sku_predict_7days.write.format('jdbc').options(url='jdbc:mysql://prod-trendsi-bi-mysql.cluster-cctv6bkiavlh.us-west-1.rds.amazonaws.com:3306/DW',driver='com.mysql.jdbc.Driver',dbtable='lkb_test_01',user='admin',password='8S5u0tWH2vQCz9bbpn2B').mode('overwrite').save()


62.常用pyspark函数 ceil …
https://blog.csdn.net/weixin_39998521/article/details/110598705

63.get_json_object
#transform sku data
sku_df = source_t_sku.toDF().filter(col('deleted')==0).select(col('id').alias('sku_id'),'sku_code',col('spu_id').alias('spu_id'),'cogs_price','image',col('dimension_values').alias('property'),'barcode','third_scan_code')
sku_df = sku_df.withColumn('color', get_json_object(col("property"),"$.Color").alias("color"))

64.pgsql 日期格式转化
to_char(date,varchar) select to_char(current_date - 1,'YYYY-MM-dd')
to_date('2012-05-01 23:59:59','yyyy-mm-dd hh24:mi:ss') char ,char
to_timestamp(char,char) 
select cast(to_char(current_date - 1,'YYYY-MM-dd') AS DATE)

cloudquery
select
pay_time,spu_id,sum(num)
from
lkb_test_skc1 where spu_id=27490 and  date_format(pay_time,'%Y-%m-%d')<date'2022-09-08' and date_format(pay_time,'%Y-%m-%d') >date'2021-04-01' group by pay_time,spu_id

65.split切分字段
sku_df = sku_df.withColumn("spu_co", split(col("barcode"), "-").getItem(0))

66.Pyspark 之 SparkSQL Demo(读S3并更新Athena映射元信息)
#此方式作废,直接使用AWS GLue 的爬网程序Craler可以直接实现
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
import pymysql

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []

import datetime
from datetime import datetime, timedelta

today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")

yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")


''' --- 读分区表---
# 读取 parquet 文件
PATH = "s3://xxxxxxx/xxxxx/xxxxxx/"
df = spark.read.parquet(PATH +"dt="+yesterday_int+"/*")
df = df.withColumn('dt',lit(yesterday_int))
# 展示 DataFrame
df.show(20)

print('begin')


DataSink5 = glueContext.getSink(
    path = PATH, 
    connection_type = "s3", 
    updateBehavior = "UPDATE_IN_DATABASE", 
    partitionKeys = ["dt"], 
    enableUpdateCatalog = True, 
    transformation_ctx = "DataSink5")

DataSink5.setCatalogInfo(
    catalogDatabase = "ods",
    catalogTableName = "ods_t_sku")

from awsglue.dynamicframe import DynamicFrame

NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
DataSink5.setFormat("glueparquet")
DataSink5.writeFrame(NewDynamicFrame)

print('end')
'''



# --- 不分区 ----
# 读取 parquet 文件
PATH = "s3://xxxxxx/xxx/xx/xxxxxxx/"
df = spark.read.parquet(PATH +"*")

# 展示 DataFrame
df.show(20)

print('begin')


DataSink5 = glueContext.getSink(
    path = PATH, 
    connection_type = "s3", 
    updateBehavior = "UPDATE_IN_DATABASE", 
    enableUpdateCatalog = True, 
    transformation_ctx = "DataSink5")

DataSink5.setCatalogInfo(
    catalogDatabase = "ods",
    catalogTableName = "ods_t_carton")

from awsglue.dynamicframe import DynamicFrame

NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
DataSink5.setFormat("glueparquet")
DataSink5.writeFrame(NewDynamicFrame)

print('end')


# query = '''
#     SELECT *
#     FROM hello.xxxxxx
#     LIMIT 5;
# '''
# 
# result = spark.sql(query)
# result.show()

67.获取字段数据类型
#Get datatype of birthday 
columndf_student.select(``"birthday"``).dtypes

68.数据类型转换dtypes(改变col类型)
df = df.withColumn("kills",df.kills.astype("int"))
df.select("kills").dtypes

69.lag & lead
https://zhuanlan.zhihu.com/p/202969159

70.实现对列的累积求和
https://blog.csdn.net/XnCSD/article/details/90676259

71.Python通知脚本
import sys
import smtplib
import datetime
from email.mime.text import MIMEText

# 邮件设置
mail_host = "smtp.163.com" # 邮箱服务器地址
mail_port = 465            # 邮箱服务器端口
mail_user = "xxxxxxxx@163.com"
mail_pass = "xxxxxxxxxx"
mail_to = ["xxxxxxxx@qq.com","xxxxxxx@qq.com"] # 多个收件人使用列表

# 邮件内容
now = datetime.datetime.now()
now = now + datetime.timedelta(hours=8) # 将当前时间增加8小时= cn time
print(now)

if now.hour >= 0 and now.hour < 2:
    msg = MIMEText("xxxxxxxx") # 邮件正文
elif now.hour >= 10 and now.hour < 12:
    msg = MIMEText("xxxxxxx") # 邮件正文
else:
    msg = MIMEText("xxxxxxx") # 邮件正文
 # 邮件正文
msg['From'] = mail_user
msg['To'] = ",".join(mail_to) # 将多个收件人邮箱地址用逗号连接
msg['Subject'] = "随便"

# 发送邮件
smtp = smtplib.SMTP_SSL(mail_host, mail_port)
smtp.login(mail_user, mail_pass)
smtp.sendmail(mail_user, mail_to, msg.as_string())
smtp.quit()
print('success')


72.dropna删除含有null的行数据
https://blog.csdn.net/qq_39954916/article/details/120584754

73.quicksight-string-date🪐
parseDate(concat(substring({dt},1,4),'-',substring({dt},5,2),'-',substring({dt},7,2)),'yyyy-MM-dd')

74.Pyspark解析URL
t_traffic_detail = t_traffic_detail.withColumn('utm_campaign' ,expr("parse_url(url,'QUERY', 'utm_campaign')"))

75.pyspark 按条件读取Mysql表部分数据,提高效率✨
print('start')
#store_product = glueContext.create_dynamic_frame.from_catalog(database = "hello", table_name = "trendsi_public_t_store_product", transformation_ctx = "datasource0")

query= "(select * from t_test where update_time >= (date(now()) - interval  '1 day' +interval '4 hour') and update_time <(date(now()) +interval '4 hour')) as testresult"
store_product = spark.read.format("jdbc").option("url", "jdbc:postgresql:xxxxxxxxxx").option("driver", "org.postgresql.Driver").option("dbtable", query).option("user", "xxxxxx").option("password", "xxxxxxxxxx").load()
print('success')

76.Athena-explode(unnest)🪐
-- 1 可供直接测试使用
WITH dataset AS (
  SELECT
    'engineering' as department,
    ARRAY['Sharon', 'John', 'Bob', 'Sally'] as users
)
SELECT department, names FROM dataset
CROSS JOIN UNNEST(users) as t(names)
-- 2.实例演示
select user_id,source from 
(select user_id,split(custom_source,';') cs from hive_dw_user_new_tag where length(custom_source)>12 limit 20)
cross join unnest(cs) as t(source)

77.intersection交集
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['A','C','D'])
z = x.intersection(y)
print(x.collect())
print(y.collect())
print(z.collect())

78.数组中取单个元素
#增加图片
wms_abnormal = source_wms_abnormal.toDF().select('id','image')
wms_abnormal = wms_abnormal.withColumn('image', regexp_replace('image', '([\[\]"]+)', ''))
wms_abnormal = wms_abnormal.withColumn("split_image",split(col("image"),","))
wms_abnormal = wms_abnormal.withColumn('image1', col('split_image').getItem(0)) 
wms_abnormal = wms_abnormal.withColumn('image2',  col('split_image').getItem(1))
wms_abnormal = wms_abnormal.withColumn('image3',  col('split_image').getItem(2))
wms_abnormal = wms_abnormal.drop('split_image')

79.重复记录归一行
from pyspark.sql.functions import collect_list

shopify_store_url = left_join.groupby('user_id').agg(collect_list('domain').alias("shopify_store_url"))

shopify_store_url = shopify_store_url.withColumn("shopify_store_url",concat_ws(",",col("shopify_store_url")))

🧆代码编写小知识四

80.Glue read S3 to DataFrame🪐
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import *


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "prod-bi", table_name = "biods_t_user", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []


import datetime
from datetime import datetime, timedelta
from pyspark.sql.types import StringType


recover_day =datetime.now()- timedelta(hours=4+24*0, minutes=0)

today_str = (recover_day).strftime("%Y-%m-%d")
today_int = (recover_day).strftime("%Y%m%d")

yesterday_str = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y%m%d")

print('start')

from pyspark.sql import SparkSession

df = spark.read.options(header='True', inferSchema='True', delimiter='&') \
  .csv("s3://xxxxxx/xx/xxx-emp")

df.printSchema()
df.show(20)

81.Cast to DecimalType
order_df = order_df.withColumn('us_delivery_cost',when((col('package_weight').isNull())&(col('logistics_company_id').isin(1132,25)),lit(10.95).cast(DecimalType(10,2))).otherwise(col('us_delivery_cost')))


82.Athena url处理函数✨
select url_extract_parameter(url,'curPage') from traffic.xxxxxx

83.Athena 提取数组元素函数✨
select case  when cardinality(tmp)=3 then concat(tmp[1],tmp[2],tmp[3]) 
             when cardinality(tmp)=2 then concat(tmp[1],tmp[2]) 
             when cardinality(tmp)=1 then tmp[1] end category123  from
(select split(event_value, '/') tmp from traffic.xxxxx where  event_target='page' and event_type='imp' and page_code='product_listing_page'   limit 20) a

84.pyspark逻辑取反
order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time')>'2021-04-01')&(col('pay_time')<today_str))


85.创建临时DataFrame
from pyspark.sql.functions import last_day

df = spark.createDataFrame([('1997-02-10',)], ['d'])
df.select(last_day(df.d).alias('date')).show()

86、QS数据时间限制(通过SQL)
select distinct * from report_inventory_kpi where  (dt >= date_format(add_months(current_timestamp,-2),'yyyyMMdd'))

87、关于pyspark中weekofyear
pyspark3.1.0之后weekofyear默认周日至周六,之前默认是周一周日,如果需要制定周一是一周的开始,则firstDayOfWeek=2
from pyspark.sql.functions import weekofyear

df.select(weekofyear('date', firstDayOfWeek=2)) # 显示指定 firstDayOfWeek 参数为 2


88、Athena获取ISO的年-周
select concat(date_format(date_add('day', -1 ,date '2023-01-03'), '%x-%v'),'W')

持续更新中…🎈文章来源地址https://www.toymoban.com/news/detail-705612.html

到了这里,关于AWS Glue Pyspark+Athena基础学习汇总的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • AWS认证SAA-C03每日一题

    本题库由云计算狂魔微信公众号分享。 【SAA-C03助理级解决方案架构师认证】 A company runs an application using Amazon ECS.The application creates resized versions of an original image and then makes Amazon S3 API calls to store the resized images in Amazon S3. How can a solutions architect ensure that the application has permission

    2024年02月11日
    浏览(14)
  • PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

    PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

    目录 前言 一、PySpark基础功能  1.Spark SQL 和DataFrame 2.Pandas API on Spark 3.Streaming 4.MLBase/MLlib 5.Spark Core 二、PySpark依赖 Dependencies 三、DataFrame 1.创建 创建不输入schema格式的DataFrame 创建带有schema的DataFrame 从Pandas DataFrame创建 通过由元组列表组成的RDD创建 2.查看 DataFrame.show() spark.sql.

    2024年01月18日
    浏览(12)
  • 数电模电基础知识学习笔记汇总

    数电模电基础知识学习笔记汇总

    文章目录: 数电和模电的关系 一:模电学习笔记 二:数电学习笔记 三:福利 1.NI Multisim14.0电路仿真软件的下载安装 2.进制转换 3.电路常用公式 4.好的参考笔记  4.1 笔记  3.1.1 模电 3.1.1 数电 4.2 网站 5.八股文 来源:一周搞(不)定数电模电全集,电子基础知识 11小时 模电基

    2024年02月15日
    浏览(13)
  • Pyspark 基础知识

    Pyspark 基础知识

    PySpark 是Spark官方提供的一个Python类库,内置了完全的Spark API,可以通过PySpark类库来编写Spark应用程序,并将其提交到Spark集群中运行。在安装好的Spark集群中,bin/pyspark 是一个交互式的程序,可以提供交互式编程并执行Spark计算。 PySpark和Spark框架对比: Spark集群(Yarn)角色

    2024年02月16日
    浏览(7)
  • PySpark基础 —— RDD

    PySpark基础 —— RDD

    1.查看Spark环境信息 2.创建RDD 创建RDD主要有两种方式 第一种:textFile方法 第二种:parallelize方法  2.1.textFile方法 本地文件系统加载数据  2.2.parallelize方法  2.3.wholeTextFiles方法 Action动作算子/行动操作 1.collect 2.take  3.first 4.top 5.takeOrdered 6.takeSample 7.count 8.sum 9.histogram 10.fold 11.re

    2024年02月07日
    浏览(13)
  • PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解

    PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解

    目录 前言 一、Pandas数据结构 1.Series 2.DataFrame  3.Time-Series  4.Panel 5.Panel4D 6.PanelND 二、Pyspark实例创建 1.引入库 2.转换实现 pyspark pandas series创建 pyspark pandas dataframe创建 from_pandas转换  Spark DataFrame转换  三、PySpark Pandas操作 1.读取行列索引 2.内容转换为数组 3.DataFrame统计描述 4.转

    2024年02月02日
    浏览(9)
  • 每日Bug汇总--Day02

    每日Bug汇总--Day02

    1、问题:运行SpringBoot项目重新导入Maven报错 报错原因: 在IDEA中的Maven配置出错 解决方案:将setting - Maven配置本地的Maven,重新install 2、问题:在重新install项目中的Maven后,报错 报错原因:未删除test文件夹 解决方案:将Maven删除test文件夹 3、报错原因:项目运行出现和数据

    2024年04月10日
    浏览(9)
  • 每日Bug汇总--Day01

    每日Bug汇总--Day01

    1、问题:运行苍穹外卖后端项目sky-take-out时,maven在install时报错 报错原因: 文件或者上级文件夹设置了只读 用户没有修改权限 EmployeeMapper.xml文件设置未设置权限问题 解决方案:将EmployeeMapper.xml的权限修改为可修改 2、问题:vscode运行新的vue项目报错 报错原因: 没找到pa

    2024年04月10日
    浏览(13)
  • 机器学习---pySpark案例

    2024年02月04日
    浏览(9)
  • 图解7: PySpark 机器学习实践

    作者:禅与计算机程序设计艺术 PySpark 是 Apache Spark 的 Python API ,它提供了一个快速、通用、高性能的计算框架。利用 PySpark 可以轻松进行数据处理、特征提取、模型训练等机器学习任务。其独特的数据抽象机制使得开发人员能够方便地在不同数据源之间共享计算逻辑,从而

    2024年02月07日
    浏览(7)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包