# EMR Studio - Lab 2

### 1、导入库

In [None]:
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, col

### 2、从S3读取数据

In [None]:
df = (spark.read.format('csv').option('header', 'True').option("inferSchema", "true").load('s3://emr-dev-exp-930743317779/data/sales.csv'))
df.show()

### 3、发起数据转换

以下命令进行如下转换：

- 首字母大写转小写
- 空格替换为下划线
- `Order Date`字段从 STRING 类型转换为 DATE 类型
- `Units Sold`、`Unit Price`和`Total Revenue`字段从 STRING 类型转换为 FLOAT 类型
- 丢弃如下字段：`Order Priority`, `Ship Date`, `Unit Cost`
- 对`Order ID`字段”实施去重

In [None]:
df_final = (
    df.withColumn("order_id", df["Order ID"]).drop("Order ID").withColumn(
        "order_date",
        to_date(col("Order Date"), "M/d/yyyy")).drop("Order Date").withColumn(
            "item_type", df["Item Type"]).drop("Item Type").withColumn(
                "sales_channel",
                df["Sales Channel"]).drop("Sales Channel").withColumn(
                    "units_sold",
                    df["Units Sold"].cast('float')).drop("Units Sold").
    withColumn("unit_price",
               df["Unit Price"].cast('float')).drop("Unit Price").withColumn(
                   "total_cost", df["Total Cost"].cast('float')).
    drop("Total Cost").withColumn(
        "total_profit",
        df["Total Profit"].cast('float')).drop("Total Profit").withColumn(
            "total_revenue",
            df["Total Revenue"].cast("float")).drop("Total Revenue").drop(
                "Order Priority", "Ship Date", "Unit Cost").distinct())
df_final.show(5)

数据处理完毕。

### 4、查看处理后的数据结构验证处理结果

In [None]:
df_final.printSchema()

可看到处理后的字段与预期结果一致。

### 5、创建临时视图

In [None]:
# Create View
df_final.createOrReplaceTempView('df_final_View')
spark.sql("select * from df_final_View").show(5)

### 6、重新分区个数为2个且保存在同一个目录

In [None]:
df_final.repartition(2).write.mode("overwrite").save("s3://emr-dev-exp-930743317779/data/output/sales/sales_final_parquet")

### 7、按Region字段生成分区并分目录保存

In [None]:
df_final.write.partitionBy("region").mode("overwrite").parquet("s3://emr-dev-exp-930743317779/data/output/sales/sales_region_final_parquet")

### 8、查看国家字段的数据

In [None]:
df_final.select('Country').show()

---