历史文档

【历史文档】高阶技巧-获取市场涨跌停统计数据,并入库,提交定时任务

由qxiao创建,最终由small_q 被浏览 164 用户

更新

本文内容对应旧版平台与旧版资源,其内容不再适合最新版平台,请查看新版平台的使用说明

新版量化开发IDE(AIStudio):

https://bigquant.com/wiki/doc/aistudio-aiide-NzAjgKapzW

新版模版策略:

https://bigquant.com/wiki/doc/demos-ecdRvuM1TU

新版数据平台:

https://bigquant.com/data/home

https://bigquant.com/wiki/doc/dai-PLSbc1SbZX

新版表达式算子:

https://bigquant.com/wiki/doc/dai-sql-Rceb2JQBdS

新版因子平台:

https://bigquant.com/wiki/doc/bigalpha-EOVmVtJMS5

\

简介

bigqunat强大的人工智能分析平台,提供了多元化的数据,方便的数据回测等,今天介绍bigqunat获取市场涨跌停统计数据,并入库,提交定时任务。分析链接

https://bigquant.com/codeshare/c8597026-33a1-46ed-b35b-034a79f38514


第一步。点击编写策略

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}点击新建立空白代码策略,我们命名为市场统计

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}命名为市场统计按回车就可用

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}提供的源代码仅仅用于交流学习,案例的介绍,不用于获取数据

导入需要库dai平台开发的数据读取工具

import pandas as pd
import requests
import dai
from bigdatasource import DataSource

我们需要安日期处理交易日获取最近的一个交易日期,读取数据交易市场选择CN

 df=DataSource('trading_days').read()
  print(df)

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}获取最近交易日的函数

def get_last_trader_date():
    '''
    获取最近的交易时间
    '''
    df=DataSource('trading_days').read()
    print(df)
    df=df[df['country_code']=='CN']
    trader_date=''.join(str(df['date'].tolist()[-1]).split('-'))[:8]
    return trader_date

获取市场涨停数据函数

 def stock_zt_pool_em(date: str = "20230621") -> pd.DataFrame:
    """
    :return: 涨停股池
    :rtype: pandas.DataFrame
    """
    url = "http://push2ex.eastmoney.com/getTopicZTPool"
    params = {
        "ut": "7eea3edcaed734bea9cbfc24409ed989",
        "dpt": "wz.ztzt",
        "Pageindex": "0",
        "pagesize": "10000",
        "sort": "fbt:asc",
        "date": date,
        "_": "1621590489736",
    }
    r = requests.get(url, params=params)
    data_json = r.json()
    if data_json["data"] is None:
        return pd.DataFrame()
    temp_df = pd.DataFrame(data_json["data"]["pool"])
    temp_df.reset_index(inplace=True)
    temp_df["index"] = range(1, len(temp_df) + 1)
    temp_df.columns = [
        "序号",
        "代码",
        "_",
        "名称",
        "最新价",
        "涨跌幅",
        "成交额",
        "流通市值",
        "总市值",
        "换手率",
        "连板数",
        "首次封板时间",
        "最后封板时间",
        "封板资金",
        "炸板次数",
        "所属行业",
        "涨停统计",
    ]
    temp_df["涨停统计"] = (
        temp_df["涨停统计"].apply(lambda x: dict(x)["days"]).astype(str)
        + "/"
        + temp_df["涨停统计"].apply(lambda x: dict(x)["ct"]).astype(str)
    )
    temp_df = temp_df[
        [
            "序号",
            "代码",
            "名称",
            "涨跌幅",
            "最新价",
            "成交额",
            "流通市值",
            "总市值",
            "换手率",
            "封板资金",
            "首次封板时间",
            "最后封板时间",
            "炸板次数",
            "涨停统计",
            "连板数",
            "所属行业",
        ]
    ]
    temp_df["首次封板时间"] = temp_df["首次封板时间"].astype(str).str.zfill(6)
    temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
    temp_df["最新价"] = temp_df["最新价"] / 1000
    temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
    temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
    temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
    temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
    temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
    temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
    temp_df["封板资金"] = pd.to_numeric(temp_df["封板资金"])
    temp_df["炸板次数"] = pd.to_numeric(temp_df["炸板次数"])
    temp_df["连板数"] = pd.to_numeric(temp_df["连板数"])
    temp_df['date']=date

程序运行的效果

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}获取市场跌停数据的函数

#跌停数据
def stock_zt_pool_dtgc_em(date: str = "20220425") -> pd.DataFrame:
    """
    :param date: 交易日
    :type date: str
    :return: 跌停股池
    :rtype: pandas.DataFrame
    """
    url = "http://push2ex.eastmoney.com/getTopicDTPool"
    params = {
        "ut": "7eea3edcaed734bea9cbfc24409ed989",
        "dpt": "wz.ztzt",
        "Pageindex": "0",
        "pagesize": "10000",
        "sort": "fund:asc",
        "date": date,
        "_": "1621590489736",
    }
    r = requests.get(url, params=params)
    data_json = r.json()
    if data_json["data"] is None:
        return pd.DataFrame()
    temp_df = pd.DataFrame(data_json["data"]["pool"])
    temp_df.reset_index(inplace=True)
    temp_df["index"] = range(1, len(temp_df) + 1)
    temp_df.columns = [
        "序号",
        "代码",
        "_",
        "名称",
        "最新价",
        "涨跌幅",
        "成交额",
        "流通市值",
        "总市值",
        "动态市盈率",
        "换手率",
        "封单资金",
        "最后封板时间",
        "板上成交额",
        "连续跌停",
        "开板次数",
        "所属行业",
    ]
    temp_df = temp_df[
        [
            "序号",
            "代码",
            "名称",
            "涨跌幅",
            "最新价",
            "成交额",
            "流通市值",
            "总市值",
            "动态市盈率",
            "换手率",
            "封单资金",
            "最后封板时间",
            "板上成交额",
            "连续跌停",
            "开板次数",
            "所属行业",
        ]
    ]
    temp_df["最新价"] = temp_df["最新价"] / 1000
    temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)

    temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
    temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
    temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
    temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
    temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
    temp_df["动态市盈率"] = pd.to_numeric(temp_df["动态市盈率"])
    temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
    temp_df["封单资金"] = pd.to_numeric(temp_df["封单资金"])
    temp_df["板上成交额"] = pd.to_numeric(temp_df["板上成交额"])
    temp_df["连续跌停"] = pd.to_numeric(temp_df["连续跌停"])
    temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
    temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
    temp_df['date']=date

    return temp_df

运行效果

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}市场分析统计函数

def count_analysis_zt_dt_data(date='20230621'):
    '''
    市场涨停跌停统计可能和同花顺不一样
    date要求是交易日
    '''
    #涨停
    df=stock_zt_pool_em(date=date)
    #跌停数据
    df1=stock_zt_pool_dtgc_em(date=date)
    zt_amount=df.shape[0]
    dt_amount=df1.shape[0]
    text='交易日{} 涨停数量{} 跌停数量{}'.format(date,zt_amount,dt_amount)
    return df,df1,zt_amount,dt_amount,text

运行的结果

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}主程序入口

if __name__=='__main__':
    last_trader_date=get_last_trader_date()
    print('交易日 {}'.format(last_trader_date))
    df,df1,zt_zmount,dt_amount,text=count_analysis_zt_dt_data(date=last_trader_date)
    print(df,df1,zt_zmount,dt_amount,text)
    #数据入库
    #市场涨停 Limit up of market
    dai.DataSource.write_bdb(
    data=df,
    # datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
    id="limit_up_of_market",
    # 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
    unique_together=["date", "代码"],
    indexes=["date"],
    )
    #跌停数据入库
    #市场跌停Market limit
    dai.DataSource.write_bdb(
    data=df,
    # datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
    id="market_limit",
    # 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
    unique_together=["date", "代码"],
    indexes=["date"],
    )

数据写入个人的数据,利用dai输入写入,id是数据库名称,unique_together用来去掉重复的数据,indexs数据的索引

#市场跌停Market limit
    dai.DataSource.write_bdb(
    data=df,
    # datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
    id="market_limit",
    # 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
    unique_together=["date", "代码"],
    indexes=["date"],
    )

自己写入的数据可用进行查看,也可用读取,利润dai数据自己的数据库

select * form 数据名称

import dai
df=dai.query('select * from limit_up_of_market').df()
print(df)

运行结果

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100}点击数据beta平台可用看到自己的数据

{w:100}{w:100}{w:100}{w:100}{w:100}{w:100} {w:100}{w:100}{w:100}{w:100}{w:100} {w:100}{w:100}{w:100}{w:100}{w:100}提交定时任务

{w:100}

import pandas as pd
import requests
import dai
from bigdatasource import DataSource
def get_last_trader_date():
    '''
    获取最近的交易时间
    '''
    df=DataSource('trading_days').read()
    df=df[df['country_code']=='CN']
    trader_date=''.join(str(df['date'].tolist()[-1]).split('-'))[:8]
    return trader_date
def stock_zt_pool_em(date: str = "20230621") -> pd.DataFrame:
    """
    :return: 涨停股池
    :rtype: pandas.DataFrame
    """
    url = "http://push2ex.eastmoney.com/getTopicZTPool"
    params = {
        "ut": "7eea3edcaed734bea9cbfc24409ed989",
        "dpt": "wz.ztzt",
        "Pageindex": "0",
        "pagesize": "10000",
        "sort": "fbt:asc",
        "date": date,
        "_": "1621590489736",
    }
    r = requests.get(url, params=params)
    data_json = r.json()
    if data_json["data"] is None:
        return pd.DataFrame()
    temp_df = pd.DataFrame(data_json["data"]["pool"])
    temp_df.reset_index(inplace=True)
    temp_df["index"] = range(1, len(temp_df) + 1)
    temp_df.columns = [
        "序号",
        "代码",
        "_",
        "名称",
        "最新价",
        "涨跌幅",
        "成交额",
        "流通市值",
        "总市值",
        "换手率",
        "连板数",
        "首次封板时间",
        "最后封板时间",
        "封板资金",
        "炸板次数",
        "所属行业",
        "涨停统计",
    ]
    temp_df["涨停统计"] = (
        temp_df["涨停统计"].apply(lambda x: dict(x)["days"]).astype(str)
        + "/"
        + temp_df["涨停统计"].apply(lambda x: dict(x)["ct"]).astype(str)
    )
    temp_df = temp_df[
        [
            "序号",
            "代码",
            "名称",
            "涨跌幅",
            "最新价",
            "成交额",
            "流通市值",
            "总市值",
            "换手率",
            "封板资金",
            "首次封板时间",
            "最后封板时间",
            "炸板次数",
            "涨停统计",
            "连板数",
            "所属行业",
        ]
    ]
    temp_df["首次封板时间"] = temp_df["首次封板时间"].astype(str).str.zfill(6)
    temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
    temp_df["最新价"] = temp_df["最新价"] / 1000
    temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
    temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
    temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
    temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
    temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
    temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
    temp_df["封板资金"] = pd.to_numeric(temp_df["封板资金"])
    temp_df["炸板次数"] = pd.to_numeric(temp_df["炸板次数"])
    temp_df["连板数"] = pd.to_numeric(temp_df["连板数"])
    temp_df['date']=date

    return temp_df
#跌停数据
def stock_zt_pool_dtgc_em(date: str = "20220425") -> pd.DataFrame:
    """
    :param date: 交易日
    :type date: str
    :return: 跌停股池
    :rtype: pandas.DataFrame
    """
    url = "http://push2ex.eastmoney.com/getTopicDTPool"
    params = {
        "ut": "7eea3edcaed734bea9cbfc24409ed989",
        "dpt": "wz.ztzt",
        "Pageindex": "0",
        "pagesize": "10000",
        "sort": "fund:asc",
        "date": date,
        "_": "1621590489736",
    }
    r = requests.get(url, params=params)
    data_json = r.json()
    if data_json["data"] is None:
        return pd.DataFrame()
    temp_df = pd.DataFrame(data_json["data"]["pool"])
    temp_df.reset_index(inplace=True)
    temp_df["index"] = range(1, len(temp_df) + 1)
    temp_df.columns = [
        "序号",
        "代码",
        "_",
        "名称",
        "最新价",
        "涨跌幅",
        "成交额",
        "流通市值",
        "总市值",
        "动态市盈率",
        "换手率",
        "封单资金",
        "最后封板时间",
        "板上成交额",
        "连续跌停",
        "开板次数",
        "所属行业",
    ]
    temp_df = temp_df[
        [
            "序号",
            "代码",
            "名称",
            "涨跌幅",
            "最新价",
            "成交额",
            "流通市值",
            "总市值",
            "动态市盈率",
            "换手率",
            "封单资金",
            "最后封板时间",
            "板上成交额",
            "连续跌停",
            "开板次数",
            "所属行业",
        ]
    ]
    temp_df["最新价"] = temp_df["最新价"] / 1000
    temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)

    temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
    temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
    temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
    temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
    temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
    temp_df["动态市盈率"] = pd.to_numeric(temp_df["动态市盈率"])
    temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
    temp_df["封单资金"] = pd.to_numeric(temp_df["封单资金"])
    temp_df["板上成交额"] = pd.to_numeric(temp_df["板上成交额"])
    temp_df["连续跌停"] = pd.to_numeric(temp_df["连续跌停"])
    temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
    temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
    temp_df['date']=date

    return temp_df
def count_analysis_zt_dt_data(date='20230621'):
    '''
    市场涨停跌停统计可能和同花顺不一样
    date要求是交易日
    '''
    #涨停
    df=stock_zt_pool_em(date=date)
    #跌停数据
    df1=stock_zt_pool_dtgc_em(date=date)
    zt_amount=df.shape[0]
    dt_amount=df1.shape[0]
    text='交易日{} 涨停数量{} 跌停数量{}'.format(date,zt_amount,dt_amount)
    return df,df1,zt_amount,dt_amount,text
if __name__=='__main__':
    last_trader_date=get_last_trader_date()
    print('交易日 {}'.format(last_trader_date))
    df,df1,zt_zmount,dt_amount,text=count_analysis_zt_dt_data(date=last_trader_date)
    print(df,df1,zt_zmount,dt_amount,text)
    #数据入库
    #市场涨停 Limit up of market
    dai.DataSource.write_bdb(
    data=df,
    # datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
    id="limit_up_of_market",
    # 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
    unique_together=["date", "代码"],
    indexes=["date"],
    )
    #跌停数据入库
    #市场跌停Market limit
    dai.DataSource.write_bdb(
    data=df,
    # datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
    id="market_limit",
    # 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
    unique_together=["date", "代码"],
    indexes=["date"],
    )

\

评论
  • 👍
  • 未来可期
  • 定时任务 更多是按钮的操作吧 \
  • 小有名气啦~
{link}