DataSource—通用数据类型
由small_q创建,最终由small_q 被浏览 262 用户
DataSource
DataSource是bigmodule原生支持的一种泛用数据类型,在底层实现了许多优化机制,以确保数据准确、安全、便捷地传输和使用是。
\
导入DataSource
DataSource相关的方法和属性,定义在库 dai 中,通过以下代码进行导入:
import dai
\
DataSource对象与datasource_id
每个DataSource对象都有一个与其相匹配的唯一ID——datasource_id。
可以通过datasource_id来获取相应的DataSource对象。
DataSource对象 = dai.DataSource("datasource_id")
\
DataSource中的属性
属性 | 说明 |
---|---|
type | 获取DataSource的类型 |
metadata | 获取DataSource的元数据 |
\
type
获取DataSource的类型。
基本用法:
DataSource对象.type
返回值类型:str
使用例:
dai.DataSource("ds_bdb").type # bdb
dai.DataSource("ds_pickle").type # pickle
dai.DataSource("ds_binary").type # binary
dai.DataSource("ds_text").type # text
dai.DataSource("ds_json").type # json
\
metadata
获取DataSource的元数据。
基本用法:
DataSource对象.metadata
返回值类型:json
使用例:
dai.DataSource("holidays").metadata
# {'type': 'bdb', 'schema': {'date': 'timestamp[ns]', 'instrument': 'string', 'score': 'double', 'position': 'int64'}, 'partitioning': [], 'indexes': [], 'unique_together': ['date', 'instrument'], 'on_duplicates': 'last', 'preserve_pandas_index': False, 'sort_by': [], 'extra': ''}
\
DataSource中的方法
方法 | 说明 |
---|---|
read() | 自动判断DataSource的类型,获取相应的数据 |
exists() | 判断DataSource是否存在,存在返回True,不存在则返回False |
write_bdb() | 创建bdb新表 |
read_bdb() | 读取bdb表 |
insert_bdb() | 更新和插入数据行到bdb表 |
apply_bdb() | 按分区(partition)处理数据,可以用于数据增删查改 |
check_bdb() | 检查数据源是否存在异常 |
write_pickle() | 写入pickle数据 |
read_pickle() | 读取pickle数据 |
write_text() | 写入文本数据 |
read_text() | 读取文本数据 |
write_json() | 写入json数据 |
read_json() | 读取json数据 |
write_binary() | 写入二进制数据 |
read_binary() | 读取二进制数据 |
delete() | 删除数据 |
write_zero() | 创建一个空的DataSource,并由您自己管理 |
mount() | 挂载DataSource目录,只支持 zero DataSource挂载 |
unmount() | 卸载挂载的DataSource目录 |
save_view() | 保存视图类DataSource |
persist() | 持久化缓存DataSource |
\
read
自动判断DataSource的类型,返回相应的数据。
基本用法:
变量标识符 = DataSource对象.read()
使用例:
a = ds_bdb.read() # 读取bdb类型的dataSource对象, 返回一个具有表结构的数据对象
b = ds_pickle.read() # 读取pickle类型的dataSource对象, 返回一个pickle数据对象
c = ds_binary.read() # 读取二进制类型的dataSource对象, 返回一个二进制数据对象
d = ds_text.read() # 读取文本类型的dataSource对象, 返回一个文本数据对象
e = ds_json.read() # 读取json类型的dataSource对象, 返回一个json数据对象
\
exists
判断DataSource是否存在,存在返回True,不存在则返回False。
基本用法:
DataSource对象.exists()
返回值类型:bool
使用例:
dai.DataSource("ds_test").exists() # True or False
\
write_bdb
创建bdb新表。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
data | Union[pa.Table, pd.DataFrame, ds.Dataset] | 数据 |
id | str | 数据源ID,默认为None |
partitioning | List[str] | 分区字段,默认为None |
indexes | List[str] | 索引字段,默认为None |
excludes | Set[str] | 排除字段,默认为None |
unique_together | List[str] | 唯一约束字段,默认为None |
on_duplicates | str | 重复数据处理策略,默认为last |
sort_by | List[Tuple[str, str]] | 排序字段,默认为None |
preserve_pandas_index | bool | 是否保留pandas索引,默认为False |
docs | Dict[str, Any] | 数据源文档,默认为None。 |
timeout | int | 锁超时时间,默认为300秒。 |
extra | str | 写入元数据的额外信息,默认为空字符串。 |
overwrite | bool | 覆盖已有数据,分区数据不支持覆盖。 |
返回值类型:DataSource对象
使用例:
import numpy as np
import pandas as pd
import dai
instruments = ["000001.SZ", "000002.SZ", "000003.SZ"]
dates = list(pd.date_range("2021-12-29", "2022-01-02"))
df = pd.DataFrame({"instrument": instruments * len(dates), "date": dates * len(instruments), "value1": np.random.random(len(dates) * len(instruments))})
# 可选:定义分区,可以用于数据访问加速
df[dai.DEFAULT_PARTITION_FIELD] = df["date"].dt.year
# 如果这里抛出 ArrowInvalid: Object is not allowed to access 的错误,请您修改id
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="ds_test",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "instrument"],
indexes=["date"],
)
\
read_bdb
读取bdb表。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
as_type | Union[pa.Table, pd.DataFrame, ds.Dataset] | 返回类型,默认为pa.Table |
partition_filter | Dict[str, Union[tuple, set]] | 分区过滤条件,默认为None |
columns | List[str] | 返回部分列,默认为None |
返回值类型:
返回值类型 | 说明 |
---|---|
pa.Table | 当as_type为pa.Table时返回。 |
pd.DataFrame | 当as_type为pd.DataFrame时返回。 |
ds.Dataset | 当as_type为ds.Dataset时返回。 |
使用例:
import pandas as pd
import dai
df_result = dai.DataSource("ds_test").read_bdb(as_type=pd.DataFrame)
df_result_with_filter = dai.DataSource("ds_test").read_bdb(as_type=pd.DataFrame, partition_filter={
"date": ("2023-01-01", "2023-01-15"),
"instrument": {"000002.SZ"}
})
df_result_with_columns = dai.DataSource("ds_test").read_bdb(as_type=pd.DataFrame, partition_filter={
"date": ("2023-01-01", "2023-01-15"),
"instrument": {"000002.SZ"}
}, columns=["instrument", "date"])
\
insert_bdb
更新和插入数据行到bdb表。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
data | Union[pa.Table, pd.DataFrame, ds.Dataset] | 数据,支持类型为pa.Table、pd.DataFrame、ds.Dataset。 |
excludes | Set[str] | 需要排除的字段,默认为None。 |
timeout | int | 超时时间,默认为300秒 |
返回值类型: DataSource对象
使用例:
import numpy as np
import pandas as pd
import dai
ds = dai.DataSource("ds_test")
instruments = ["000001.SZ", "000002.SZ", "000003.SZ"]
dates = list(pd.date_range("2022-01-05", "2022-01-15"))
df = pd.DataFrame({"instrument": instruments * len(dates), "date": dates * len(instruments), "value1": np.random.random(len(dates) * len(instruments))})
df[dai.DEFAULT_PARTITION_FIELD] = df["date"].dt.year
ds.insert_bdb(df)
\
apply_bdb
按partition处理数据,可以用于数据增删查改。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
func | Callable[[Union[pa.Table, pd.DataFrame, ds.Dataset]], Optional[Union[pa.Table, pd.DataFrame, ds.Dataset]]] | 数据处理函数。输入为当前分区的数据。返回数据,如果长度大于0,则使用返回数据更新分区;如果长度为0,则删除当前分区;如果为None,则不对分区做处理。 |
as_type | Union[pa.Table, pd.DataFrame, ds.Dataset] | 数据插入类型,默认为pa.Table |
partition_filter | Dict[str, Union[tuple, set]] | 分区过滤条件,默认为None。 |
timeout | int | 超时时间,默认为300秒 |
返回值类型: DataSource对象
使用例:
import numpy as np
import pandas as pd
import dai
ds = dai.DataSource("ds_test")
def delete_000001(df):
df = df[df["instrument"] != "000001.SZ"]
return df
def insert_value2(df):
df["value2"] = np.random.random(len(df))
return df
def remove_value2(df):
del df["value2"]
return df
def delete_all(df):
return pd.DataFrame()
# 删除行
ds.apply_bdb(delete_000001, as_type=pd.DataFrame)
# 插入列
ds.apply_bdb(insert_value2, as_type=pd.DataFrame)
# 删除列
ds.apply_bdb(remove_value2, as_type=pd.DataFrame)
# 指定 partition_filter 可以只处理某些分区的数据以提高效率,例如删除 2022 分区的所有数据
ds.apply_bdb(
delete_all,
as_type=pd.DataFrame,
partition_filter={
"date": ("2022-01-01", "2023-01-01")
}
)
\
check_bdb
检查数据源是否存在异常。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
delete_invalid_data | bool | 是否删除异常数据,默认为False。 |
返回值类型:无返回值
使用例:
dai.DataSource("ds_test").check_bdb()
# 删除异常数据,后续再补充数据
dai.DataSource("ds_test").check_bdb(delete_invalid_data=True)
\
write_pickle
写入pickle数据。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
data | PyObject | 数据,可以任意的python数据类型 |
id | str | 数据id,默认为None,写入到cache数据 |
overwrite | bool | 覆盖已有数据,默认为False |
返回值类型:DataSource对象
使用例:
import dai
data = {"a": 1, "b": 2}
dai.DataSource.write_pickle(data, id="ds_pickle")
\
read_pickle
读取pickle数据。
基本用法:
DataSource对象.read_pickle()
返回值类型:PyObject
使用例:
import dai
dai.DataSource("ds_pickle").read_pickle()
# {'a': 1, 'b': 2}
\
write_text
写入文本数据。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
data | str | 数据 |
id | str | 数据id,默认为None,写入到cache数据 |
overwrite | bool | 覆盖已有数据,默认为False |
返回值类型:DataSource对象
使用例:
import dai
text = "Hello, world!"
dai.DataSource.write_text(text, id="ds_text")
\
read_text
读取文本数据。
基本用法:
DataSource对象.read_text()
返回值类型:str
使用例:
import dai
dai.DataSource("ds_text").read_text()
# 'Hello, world!'
\
write_json
写入json数据。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
data | any | 数据,具有 json 的结构 |
id | str | 数据id,默认为None,写入到cache数据 |
overwrite | bool | 覆盖已有数据,默认为False |
返回值类型:DataSource对象
使用例:
import dai
json = {"a": 1, "b": 2, "c": [1, 2, 3]}
dai.DataSource.write_json(json, id="ds_json")
\
read_json
读取json数据。
基本用法:
DataSource对象.read_json()
返回值类型:any
使用例:
import dai
dai.DataSource("ds_json").read_json()
# {"a": 1, "b": 2, "c": [1, 2, 3]}
\
write_binary
写入binary数据。
参数列表:
参数与 | 类型 | 说明 |
---|---|---|
data | bytes | 数据,具有 json 的结构 |
id | str | 数据id,默认为None,写入到cache数据 |
overwrite | bool | 覆盖已有数据,默认为False |
返回值类型:DataSource对象
使用例:
import dai
binary = b"Hello, world!"
dai.DataSource.write_binary(binary, id="ds_binary")
\
read_binary
读取binary数据。
基本用法:
DataSource对象.read_binary()
返回值类型:bytes
使用例:
import dai
dai.DataSource("ds_binary").read_binary()
# b'Hello, world!'
\
delete
删除数据。
基本用法:
DataSource对象.delete()
使用例:
import dai
dai.DataSource("ds_binary").delete()
\
write_zero
创建一个空的DataSource,并由您自己管理。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
id | str | 数据id |
返回值类型:DataSource对象
使用例:
import dai
ds = dai.DataSource.write_zero("ds_zero")
\
mount
挂载DataSource目录,只支持 zero DataSource挂载。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
with_write | bool | 可写模式挂载,只有DataSource创建者可以执行此操作,默认为False。 |
symlink_to | str | 创建软链到指定地址,默认为None。 |
symlink_force | bool | 强制创建软链,如果目标地址文件存在,则删除,默认为False。 |
返回值类型: DataSourceMount
使用例:
import dai
with dai.DataSource("ds_zero").mount(with_write=True) as path:
# mount到本地 path,并自动 unmount
print(f"{path=}")
# 往挂载目录可以写入数据
with open(f"{path}/a.txt", "w") as f:
f.write("Hello, world!")
\
unmount
卸载挂载的DataSource目录。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
path | str | 挂载目录 |
返回值类型: 无返回值
使用例:
import dai
ds = dai.DataSource("ds_zero")
mnt = ds.mount()
print(f"{mnt.path=}")
with open(f"{mnt.path}/a.txt", "r") as f:
print(f.read())
ds.unmount(mnt.path)
\
save_view
保存视图类DataSource。
参数列表:
参数与 | 类型 | 说明 |
---|---|---|
id | str | 数据id |
sql | str | 视图sql |
update_if_exists | bool | 如果视图存在,则更新,默认为True。 |
docs | Dict[str, Any] | 视图文档,默认为None。 |
timeout | int | 超时时间,默认为300秒。 |
返回值类型: DataSource对象
使用例:
import dai
sql = """
select * from cn_stock_bar1d prune join cn_stock_valuation using (instrument, date)
"""
dai.DataSource.save_view("ds_view", sql)
result_df = dai.query("select * from test_for_fun_view where date > '2023-01-01' order by instrument, date").df()
\
persist
持久化缓存DataSource。
参数列表:
参数 | 类型 | 说明 |
---|---|---|
id | str | 数据id |
overwrite | bool | 是否覆盖已有数据源,默认为False。 |
返回值类型: DataSource对象
使用例:
import dai
text = "Hello, world!"
cached_datasource = dai.DataSource.write_text(text)
cached_datasource.persist("ds_persist")
\
⭐为什么推荐使用DataSource
🔥使用DataSource作为统一的数据格式,其具有以下优点:🔥
- DataSource作为bigmodule原生的标准且通用的数据格式,拥有更加底层的技术支持,可以方便地在模块之间进行传输,让数据流动得更自由、迅速;
- DataSource提供了许多方法,能够方便地转化为其他类型的数据,也可以由其他类型转化而来,具有强大的兼容性和通用性,可以应对实际开发和使用过程中数据类型复杂多变的情况;
- DataSource底层实现了缓存及共享机制,
- 🚀缓存机制可以帮助加快线性策略的重复执行。例如,当可视化策略中仅有部分下游模块发生了变动,而上游模块不受影响时,再次执行该策略,上游模块先前输出的结果会被直接获取,并非被重新执行一次,只有那些发生改动的模块会被执行。这样一来,便可以加快策略的执行效率,特别是在尝试调整策略的时候,将节省下许多调试等待的时间。
- 🚀共享机制可以加速数据在模块之间的传递。在调用DataSource相关方法封装数据时,都会得到一个对应的对象实例,该对象实例会带有一个特殊的属性——datasource_id(相当于数据的标记编号)。通过对象实例让数据在模块之间传递,在这个过程中,实际上仅是传递了datasource_id,而具体的数据则是保存在 redis 缓存数据库中。当模块在接收到来自另一个模块的数据datasource_id时,就可以通过该ID访问数据库来获取数据,避免了传递海量数据时带宽资源紧张的问题,有效地节约了数据传输的时间。
\