Great Expectations(GE)数据健康性检测安装使用

家电修理 2023-07-16 19:17www.caominkang.com电器维修

目录

软件介绍

环境信息

安装GE

使用GE

常规方式

内存模式

引入依赖

创建GE context

配置context datasource

生成data frame

创建batch request

创建suite

创建validator

设定验证规则

执行验证并打印结果

结果示例

DEMO


软件介绍

Great Expectations用来检测数据健康性。

docs地址https://greatexpectations.io/

他支持数据库,文件系统,云,内存的数据健康性检测。

支持数据库如下Athena,BigQuery,MSSQL,MYSQL,PostgreSQL,Redshift,Snoflake,SQLLite

支持通过Spark驱动或者Pandas驱动来访问文件系统

支持通过Sprak或Pandas来连接以下云存储AWS-S3,GCS(google),Azure Blob storage(Microsoft)

支持通过Spark和Pandas来进行内存操作。

GE来进行数据健康性检测,可以检测例如以下内容

1、字段为null验证

2、字段的取值范围验证

3、数据行数阈值验证

......

(共支持265种检测方式,详见https://greatexpectations.io/expectations/)

本文只介绍如果通过使用GE与spark结合来进行内存数据健康性检查。

从安装开始。

环境信息

系统版本macOS或Red Hat 7.3.1-13

软件信息python3.7(建议3.6.1版本以上,否则可能无法执行)

依赖pip/pip3、ruamel_yaml、scipy、matplotlib、python-ldap、typing、git、spark

安装GE

GE安装非常简单

下载GE源码并安装

git clone https://github./superconductive/ge_tutorials
cd ge_tutorials
pip install great_expectations

测试是否安装成功,如果没有返回请使用pip3重新安装全部依赖

great_expectations --version

如果安装成功会得到类似如下的返回

使用GE

常规方式

GE基本工作方式如下

  • 新建datasource连接,如果是GE支持的数据库将非常简单
  • 新建suite,配置验证规则,依赖datasource自动生成基本验证规则非常简单
  • 新建checkpoint,ge执行验证文件,需要配置datasource和suite

以上3步做好后,通过great expectations checkpoint run 就会执行验证并得到结果

内存模式

引入依赖

打开一个空的python文件并引入依赖

from ruamel import yaml
from pyspark.sql import SparkSession
from typing import Any, Dict, List, Optional
import great_expectations as ge
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
 DataContextConfig,
 InMemoryStoreBackendDefaults,
)
from great_expectations.core.expectation_configuration import ExpectationConfiguration

创建GE context
store_backend_defaults = InMemoryStoreBackendDefaults()
data_context_config = DataContextConfig(
 store_backend_defaults=store_backend_defaults,
 checkpoint_store_name=store_backend_defaults.checkpoint_store_name,
)
context = BaseDataContext(project_config=data_context_config)

配置context datasource
datasource_yaml = f"""
name: my_spark_dataframe
class_name: Datasource
execution_engine:
 class_name: SparkDFExecutionEngine
 force_reuse_spark_context: true
data_connectors:
 default_runtime_data_connector_name:
  class_name: RuntimeDataConnector
  batch_identifiers:
   - batch_id
"""
context.test_yaml_config(datasource_yaml)
context.add_datasource(yaml.load(datasource_yaml))

生成data frame
df = SparkSession.builder.appName('leo_test_load') 
 .enableHiveSupport() 
 .getOrCreate() 
 .sql("select  from beta.click here dt = '20220518'")

创建batch request
batch_request = RuntimeBatchRequest(
 datasource_name="my_spark_dataframe",
 data_connector_name="default_runtime_data_connector_name",
 data_asset_name="asset",  
 batch_identifiers={"batch_id": "default_identifier"},
 runtime_parameters={"batch_data": df},  
)

创建suite
suite = context.create_expectation_suite(
 expectation_suite_name="test_suite", overrite_existing=True
)

创建validator
validator = context.get_validator(
 batch_request=batch_request, expectation_suite_name="test_suite"
)

设定验证规则

可以设置多个规则

kargs: Dict[str, Any] = {"column": "score"}
kargs["min_value"] = 0
kargs["max_value"] = 100
expectation = ExpectationConfiguration("expect_column_values_to_be_beteen", kargs)
validator.append_expectation(expectation)

执行验证并打印结果
print(validator.validate().results)

结果示例
[{
  "result": {
 "element_count": 3199059,
 "unexpected_count": 322274,
 "unexpected_percent": 10.212081775492893,
 "partial_unexpected_list": [
   106,
   160,
   107,
   122,
   344,
   476,
   378,
   203,
   106,
   183,
   101,
   117,
   106,
   121,
   110,
   101,
   109,
   147,
   242,
   155
 ],
 "missing_count": 43248,
 "missing_percent": 1.3518975423710535,
 "unexpected_percent_total": 10.074024892945081,
 "unexpected_percent_nonmissing": 10.212081775492893
  },
  "exception_info": {
 "raised_exception": false,
 "exception_traceback": null,
 "exception_message": null
  },
  "meta": {},
  "suess": false,
  "expectation_config": {
 "kargs": {
   "column": "itemindex",
   "min_value": 0,
   "max_value": 100,
   "batch_id": "d628c5b9f32bdc923d4d1805503a477d"
 },
 "expectation_type": "expect_column_values_to_be_beteen",
 "meta": {}
  }
}]

DEMO

本地执行demo

from ruamel import yaml
from pyspark.sql import SparkSession
from typing import Any, Dict, List, Optional
import great_expectations as ge
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
 DataContextConfig,
 InMemoryStoreBackendDefaults,
)

#from great_expectations.expectations.core.expectation_con import ExpectationConfiguration
from great_expectations.core.expectation_configuration import ExpectationConfiguration

# basic dataframe
data = [
 {"a": 1, "b": 2, "c": 3},
 {"a": 4, "b": 5, "c": 6},
 {"a": 7, "b": 8, "c": 9},
]
sparksession = SparkSession.builder.master("local[1]") 
 .appName('ge_demo') 
 .getOrCreate()

df = sparksession.createDataFrame(data=data)
#df = sparksession.read.parquet("file:///Users/cl10189-m/greate_expectations/part-00000-2523d045-ab36-43ef-9b62-0101d0530cd9-c000.snappy.parquet")
# NOTE: InMemoryStoreBackendDefaults SHOULD NOT BE USED in normal settings. You
# may experience data loss as it persists nothing. It is used here for testing.
# Please refer to docs to learn ho to instantiate your DataContext.
store_backend_defaults = InMemoryStoreBackendDefaults()
data_context_config = DataContextConfig(
 store_backend_defaults=store_backend_defaults,
 checkpoint_store_name=store_backend_defaults.checkpoint_store_name,
)
context = BaseDataContext(project_config=data_context_config)

datasource_yaml = f"""
name: my_spark_dataframe
class_name: Datasource
execution_engine:
  class_name: SparkDFExecutionEngine
  force_reuse_spark_context: true
data_connectors:
  default_runtime_data_connector_name:
   class_name: RuntimeDataConnector
   batch_identifiers:
    - batch_id
"""

context.test_yaml_config(datasource_yaml)

context.add_datasource(yaml.load(datasource_yaml))

# Here is a RuntimeBatchRequest using a dataframe
batch_request = RuntimeBatchRequest(
 datasource_name="my_spark_dataframe",
 data_connector_name="default_runtime_data_connector_name",
 data_asset_name="ge_asset",  # This can be anything that identifies this data_asset for you
 batch_identifiers={"batch_id": "default_identifier"},
 runtime_parameters={"batch_data": df},  # Your dataframe goes here
)

suite = context.create_expectation_suite(
 expectation_suite_name="test_suite", overrite_existing=True
)

validator = context.get_validator(
 batch_request=batch_request, expectation_suite_name="test_suite"
)

kargs: Dict[str, Any] = {"column": "a"}
kargs["min_value"] = 1
kargs["max_value"] = 6

expectation = ExpectationConfiguration("expect_column_values_to_be_beteen", kargs)
validator.append_expectation(expectation)

kargs: Dict[str, Any] = {"column": "b"}

expectation = ExpectationConfiguration("expect_column_values_to_be_unique", kargs)
validator.append_expectation(expectation)

print(validator.validate().results)
sparksession.s()

Copyright © 2016-2025 www.caominkang.com 曹敏电脑维修网 版权所有 Power by