股票配资品牌有哪些 用Python玩转量化选股:一键计算技术指标,轻松抓牛股!附完整代码与详细拆解

发布日期:2025-05-27 21:35    点击次数:125

股票配资品牌有哪些 用Python玩转量化选股:一键计算技术指标,轻松抓牛股!附完整代码与详细拆解

开篇为什么要搞这些技术指标?

说到股票分析,很多人第一反应是“看K线图”。但其实股票配资品牌有哪些,K线图只是冰山一角,真正的高手会结合各种技术指标来做决策。比如:

移动平均线(MA):帮你判断股票的中长期趋势。RSI(相对强弱指数):告诉你股票是超买还是超卖。MACD(指数平滑异同移动平均线):捕捉股票的动能变化。布林带(Bollinger Bands):衡量股票的波动性。KDJ指标:判断股票的超买超卖状态。

这些指标就像“股票分析工具箱”,有了它们,你就能更科学地选股,而不是凭感觉瞎买。

代码拆解:一步步教你用Python玩转技术指标

接下来,我会用人话带你拆解这个脚本,保证你看得懂、学得会!

1. 连接数据库,读取股票数据

首先,我们需要从数据库里读取股票的历史数据。这里用到了mysql.connector库,操作非常简单:

def connect_to_mysql(host, user, password, database):    conn = mysql.connector.connect(        host=host,        user=user,        password=password,        database=database    )    return conndef get_stock_data_from_mysql(conn, stock_code, start_date, end_date):    query = f'SELECT dt, open, close, high, low, vol, tr FROM gp_base_info ' \            f'WHERE code = %s AND dt BETWEEN %s AND %s ' \            f'ORDER BY dt ASC;'    cursor = conn.cursor()    cursor.execute(query, (stock_code, start_date, end_date))    rows = cursor.fetchall()    columns = [desc[0] for desc in cursor.description]    stock_data = pd.DataFrame(rows, columns=columns)    cursor.close()    stock_data.set_index('dt', inplace=True)    return stock_dataconnect_to_mysql:连接你的MySQL数据库。get_stock_data_from_mysql:根据股票代码和日期范围,从数据库里读取数据,并返回一个DataFrame。2. 计算移动平均线(MA)

移动平均线是判断趋势的“神器”。我们计算了10日、50日、100日和200日的移动平均线:

def calculate_moving_averages(data):    data['MA10'] = data['close'].rolling(window=10).mean()    data['MA50'] = data['close'].rolling(window=50).mean()    data['MA100'] = data['close'].rolling(window=100).mean()    data['MA200'] = data['close'].rolling(window=200).mean()    data['AVG_VOL'] = data['vol'].rolling(window=10).mean()    data['AVG_TR'] = data['tr'].rolling(window=10).mean()    return data
rolling(window=10).mean():计算10日移动平均线。AVG_VOL和AVG_TR:分别计算10日的平均成交量和换手率。3. 计算RSI(相对强弱指数)

RSI是衡量股票超买或超卖状态的指标,数值在0到100之间。通常,RSI超过70表示超买,低于30表示超卖。

def calculate_rsi(df, period=14):    df['RSI'] = talib.RSI(df['close'], timeperiod=period)    return dftalib.RSI:使用TA-Lib库计算RSI。4. 计算MACD(指数平滑异同移动平均线)

MACD是捕捉股票动能变化的指标,包括MACD线、信号线和柱状图。

def calculate_macd(df, fast_period=12, slow_period=26, signal_period=9):    macd, macd_signal, macd_hist = talib.MACD(df['close'], fastperiod=fast_period, slowperiod=slow_period, signalperiod=signal_period)    df['MACD'] = macd    df['MACD_Signal'] = macd_signal    df['MACD_Hist'] = macd_hist    return df
talib.MACD:使用TA-Lib库计算MACD。5. 计算布林带(Bollinger Bands)

布林带是衡量股票波动性的指标,包括上轨、中轨和下轨。

def calculate_bld(df):    df['BLD_MID'] = df['close'].rolling(window=20).mean()    df['STD'] = df['close'].rolling(window=20).std()    df['BLD_UP'] = df['BLD_MID'] + 2 * df['STD']    df['BLD_DOWN'] = df['BLD_MID'] - 2 * df['STD']    df.drop(columns=['STD'], inplace=True)    return dfBLD_MID:布林带的中轨(20日移动平均线)。BLD_UP和BLD_DOWN:布林带的上轨和下轨。6. 计算KDJ指标

KDJ是判断股票超买超卖状态的指标,包括K线、D线和J线。

def calculate_kdj(data, n=9):    data['MAX'] = data['close'].rolling(window=n).max()    data['MIN'] = data['close'].rolling(window=n).min()    data['RSV'] = ((data['close'] - data['MIN']) / (data['MAX'] - data['MIN']) * 100).fillna(50)    data['K'] = 50.0    data['D'] = 50.0    data['J'] = 50.0    for i in range(1, len(data)):        if pd.isna(data.loc[data.index[i], 'RSV']):            data.loc[data.index[i], 'K'] = data.loc[data.index[i - 1], 'K']            data.loc[data.index[i], 'D'] = data.loc[data.index[i - 1], 'D']        else:            data.loc[data.index[i], 'K'] = data.loc[data.index[i - 1], 'K'] * 2 / 3 + data.loc[data.index[i], 'RSV'] * 1 / 3            data.loc[data.index[i], 'D'] = data.loc[data.index[i - 1], 'D'] * 2 / 3 + data.loc[data.index[i], 'K'] * 1 / 3        data.loc[data.index[i], 'J'] = 3 * data.loc[data.index[i], 'K'] - 2 * data.loc[data.index[i], 'D']    data.drop(columns=['MAX', 'MIN', 'RSV'], inplace=True)    return data
K、D、J:分别计算KDJ指标的值。7. 判断成交量和换手率是否放大

通过比较当前成交量和换手率与过去10日的平均值,判断是否放量。

def check_vol_tr(data, date):    row = data.loc[date]    last_vol = float(row['vol'])    last_tr = float(row['tr'])    avg_vol = row['AVG_VOL']    avg_tr = row['AVG_TR']    if np.isnan(avg_vol):        avg_vol = last_vol    if np.isnan(avg_tr):        avg_tr = last_tr    if avg_vol == 0:        avg_vol_rate = 1.0    else:        avg_vol_rate = last_vol / avg_vol    if avg_tr == 0:        avg_tr_rate = 1.0    else:        avg_tr_rate = last_tr / avg_tr    return (avg_vol_rate, avg_tr_rate)avg_vol_rate和avg_tr_rate:分别表示成交量和换手率的放大倍数。8. 判断趋势

通过比较收盘价与不同周期的移动平均线,判断短期、中期和长期趋势。

def check_trend(data, date):    row = data.loc[date]    last_close = row['close']    last_ma10 = row['MA10']    last_ma50 = row['MA50']    last_ma100 = row['MA100']    last_ma200 = row['MA200']    trend1 = 0    trend2 = 0    trend3 = 0    if not np.isnan(last_ma10) and not np.isnan(last_ma50) and last_close > last_ma10 > last_ma50:        trend1 = 1    if not np.isnan(last_ma50) and not np.isnan(last_ma100) and last_close < last_ma50 < last_ma100:        trend2 = 1    if not np.isnan(last_ma100) and not np.isnan(last_ma200) and last_close < last_ma100 < last_ma200:        trend3 = 1    return (trend1, trend2, trend3)

trend1、trend2、trend3:分别表示短期、中期和长期趋势。

9. 批量插入结果到数据库

将计算出的指标批量插入到数据库中。

def insert_results_to_mysql_batch(conn, data_list):    cursor = conn.cursor()    query = '''    INSERT INTO stock_indicators (code, dt, open, close, ma10, ma50, ma100, ma200, vol, tr, trend1, trend2, trend3,     vol_add_rate, tr_add_rate,rsi,macd,macd_signal,macd_hist,bld_up,bld_mid,bld_down,k,d,j)     VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)    ON DUPLICATE KEY UPDATE        open = VALUES(open),        close = VALUES(close),        ma10 = VALUES(ma10),        ma50 = VALUES(ma50),        ma100 = VALUES(ma100),        ma200 = VALUES(ma200),        vol = VALUES(vol),        tr = VALUES(tr),        trend1 = VALUES(trend1),        trend2 = VALUES(trend2),        trend3 = VALUES(trend3),        vol_add_rate = VALUES(vol_add_rate),        tr_add_rate = VALUES(tr_add_rate),        bld_up = VALUES(bld_up),        bld_mid = VALUES(bld_mid),        bld_down = VALUES(bld_down),        k = VALUES(k),        d = VALUES(d),        j = VALUES(j)    '''    cleaned_data_list = []    for data in data_list:        cleaned_data = [0 if isinstance(value, float) and np.isnan(value) else value for value in data]        cleaned_data_list.append(cleaned_data)    cursor.executemany(query, cleaned_data_list)    conn.commit()    cursor.close()insert_results_to_mysql_batch:将计算出的指标批量插入到stock_indicators表中。10. 核心函数:处理单只股票

这是整个脚本的核心部分,负责处理每只股票的指标计算和数据存储。

def process_stock_code(stock_code, start_date, end_date):    # 连接数据库    conn = connect_to_mysql(host, user, password, database)    # 读取数据    stock_data = get_stock_data_from_mysql(conn, stock_code, start_date, end_date)    # 计算移动平均线    stock_data = calculate_moving_averages(stock_data)    # 计算RSI    stock_data = calculate_rsi(stock_data)    # 计算MACD    stock_data = calculate_macd(stock_data)    # 计算布林带    stock_data = calculate_bld(stock_data)    # 计算KDJ    stock_data = calculate_kdj(stock_data)    # 批量插入的数据列表    batch_data = []    # 逐日判断趋势并插入 MySQL 表    for date in stock_data.index:        # 判断成交量和换手率是否放大        vol_add_rate, tr_add_rate = check_vol_tr(stock_data, date)        # 判断短期、中期和长期趋势        trend1, trend2, trend3 = check_trend(stock_data, date)        row = stock_data.loc[date]        # 准备批量插入的数据        batch_data.append((            stock_code, date, row['open'], row['close'], row['MA10'], row['MA50'], row['MA100'], row['MA200'],            int(row['vol']), row['tr'], trend1, trend2, trend3, vol_add_rate, tr_add_rate, row['RSI'], row['MACD'],            row['MACD_Signal'], row['MACD_Hist'], row['BLD_UP'], row['BLD_MID'], row['BLD_DOWN'], row['K'], row['D'], row['J']        ))    # 批量插入数据    insert_results_to_mysql_batch(conn, batch_data)    # 关闭数据库连接    conn.close()
process_stock_code:负责处理每只股票的指标计算和数据存储。11. 多线程处理

为了提高效率,我们使用多线程来并行处理每只股票的数据。

stock_codes = get_all_code()# 设置日期范围start_date = '2024-02-20'end_date = datetime.now().strftime('%Y-%m-%d')  # 今天# 使用多线程处理每个股票代码with ThreadPoolExecutor(max_workers=12) as executor:    executor.map(lambda code: process_stock_code(code, start_date, end_date), stock_codes)ThreadPoolExecutor:创建一个最大线程数为12的线程池。完整代码
import mysql.connectorimport pandas as pdfrom datetime import datetime, timedeltaimport numpy as npimport requestsimport talibfrom concurrent.futures import ThreadPoolExecutor# 连接 MySQL 数据库def connect_to_mysql(host, user, password, database):    conn = mysql.connector.connect(        host=host,        user=user,        password=password,        database=database    )    return conn# 从 MySQL 读取股票数据def get_stock_data_from_mysql(conn, stock_code, start_date, end_date):    query = f'SELECT dt, open, close, high, low, vol, tr FROM gp_base_info ' \            f'WHERE code = %s AND dt BETWEEN %s AND %s ' \            f'ORDER BY dt ASC;'    cursor = conn.cursor()    cursor.execute(query, (stock_code, start_date, end_date))    rows = cursor.fetchall()    columns = [desc[0] for desc in cursor.description]    stock_data = pd.DataFrame(rows, columns=columns)    cursor.close()    # stock_data = pd.read_sql(query, conn)    stock_data.set_index('dt', inplace=True)    return stock_data# 计算每日的移动平均线def calculate_moving_averages(data):    data['MA10'] = data['close'].rolling(window=10).mean()    data['MA50'] = data['close'].rolling(window=50).mean()    data['MA100'] = data['close'].rolling(window=100).mean()    data['MA200'] = data['close'].rolling(window=200).mean()    data['AVG_VOL'] = data['vol'].rolling(window=10).mean()    data['AVG_TR'] = data['tr'].rolling(window=10).mean()    return data# 计算 RSIdef calculate_rsi(df, period=14):    df['RSI'] = talib.RSI(df['close'], timeperiod=period)    return df# 计算 MACDdef calculate_macd(df, fast_period=12, slow_period=26, signal_period=9):    macd, macd_signal, macd_hist = talib.MACD(df['close'], fastperiod=fast_period, slowperiod=slow_period, signalperiod=signal_period)    df['MACD'] = macd    df['MACD_Signal'] = macd_signal    df['MACD_Hist'] = macd_hist    return df# 计算布林带def calculate_bld(df):    # 计算布林带    df['BLD_MID'] = df['close'].rolling(window=20).mean()    df['STD'] = df['close'].rolling(window=20).std()    df['BLD_UP'] = df['BLD_MID'] + 2 * df['STD']    df['BLD_DOWN'] = df['BLD_MID'] - 2 * df['STD']    df.drop(columns=['STD'], inplace=True)    return dfdef calculate_kdj(data, n=9):    try:        # 将 'close' 列转换为 float 类型        data['close'] = data['close'].astype(float)        # 计算 MAX 和 MIN        data['MAX'] = data['close'].rolling(window=n).max()        data['MIN'] = data['close'].rolling(window=n).min()        # 计算 RSV,避免除以 0 或 NaN        data['RSV'] = ((data['close'] - data['MIN']) / (data['MAX'] - data['MIN']) * 100).fillna(50)        # 初始化 K、D、J 列,显式设置为 float 类型        data['K'] = 50.0  # 使用 50.0 而不是 50        data['D'] = 50.0        data['J'] = 50.0        # 计算 K、D、J 值        for i in range(1, len(data)):            # 如果 RSV 是 NaN,保持 K 和 D 的值不变            if pd.isna(data.loc[data.index[i], 'RSV']):                data.loc[data.index[i], 'K'] = data.loc[data.index[i - 1], 'K']                data.loc[data.index[i], 'D'] = data.loc[data.index[i - 1], 'D']            else:                data.loc[data.index[i], 'K'] = data.loc[data.index[i - 1], 'K'] * 2 / 3 + data.loc[                    data.index[i], 'RSV'] * 1 / 3                data.loc[data.index[i], 'D'] = data.loc[data.index[i - 1], 'D'] * 2 / 3 + data.loc[                    data.index[i], 'K'] * 1 / 3            # 计算 J 值            data.loc[data.index[i], 'J'] = 3 * data.loc[data.index[i], 'K'] - 2 * data.loc[data.index[i], 'D']        # 删除中间变量        data.drop(columns=['MAX', 'MIN', 'RSV'], inplace=True)    except Exception as e:        print(f'捕获到异常: {e}')    return data# 判断成交和还手是否放大def check_vol_tr(data,date):    # 获取指定日期的数据    row = data.loc[date]    last_vol= float(row['vol'])    last_tr = float(row['tr'])    avg_vol = row['AVG_VOL']    avg_tr = row['AVG_TR']    if np.isnan(avg_vol):        avg_vol = last_vol    if np.isnan(avg_tr):        avg_tr = last_tr    if avg_vol == 0:        avg_vol_rate = 1.0    else:        avg_vol_rate = last_vol / avg_vol    if avg_tr == 0:        avg_tr_rate = 1.0    else:        avg_tr_rate = last_tr / avg_tr    return (avg_vol_rate, avg_tr_rate)# 判断短期的趋势def check_trend(data, date):    # 获取指定日期的数据    row = data.loc[date]    last_close = row['close']    last_ma10 = row['MA10']    last_ma50 = row['MA50']    last_ma100 = row['MA100']    last_ma200 = row['MA200']    trend1 = 0    trend2 = 0    trend3 = 0    # 判断趋势    if not np.isnan(last_ma10) and not np.isnan(last_ma50) and last_close > last_ma10 > last_ma50:        trend1 = 1    if not np.isnan(last_ma50) and not np.isnan(last_ma100) and last_close < last_ma50 < last_ma100:        trend2 = 1    if not np.isnan(last_ma100) and not np.isnan(last_ma200) and last_close < last_ma100 < last_ma200:        trend3 = 1    return (trend1, trend2, trend3)# 批量插入结果到 MySQL 表def insert_results_to_mysql_batch(conn, data_list):    cursor = conn.cursor()    query = '''    INSERT INTO stock_indicators (code, dt, open, close, ma10, ma50, ma100, ma200, vol, tr, trend1, trend2, trend3,     vol_add_rate, tr_add_rate,rsi,macd,macd_signal,macd_hist,bld_up,bld_mid,bld_down,k,d,j)     VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)    ON DUPLICATE KEY UPDATE        open = VALUES(open),        close = VALUES(close),        ma10 = VALUES(ma10),        ma50 = VALUES(ma50),        ma100 = VALUES(ma100),        ma200 = VALUES(ma200),        vol = VALUES(vol),        tr = VALUES(tr),        trend1 = VALUES(trend1),        trend2 = VALUES(trend2),        trend3 = VALUES(trend3),        vol_add_rate = VALUES(vol_add_rate),        tr_add_rate = VALUES(tr_add_rate),        bld_up = VALUES(bld_up),        bld_mid = VALUES(bld_mid),        bld_down = VALUES(bld_down),        k = VALUES(k),        d = VALUES(d),        j = VALUES(j)    '''    # 替换 NaN 值为 0    cleaned_data_list = []    for data in data_list:        cleaned_data = [0 if isinstance(value, float) and np.isnan(value) else value for value in data]        cleaned_data_list.append(cleaned_data)    cursor.executemany(query, cleaned_data_list)    conn.commit()    cursor.close()def get_all_code() :    all_code_url = 'http://44.push2.eastmoney.com/api/qt/clist/get?pn=1&pz=10000&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&fid=f3&fs=m:0+t:6,m:0+t:13,m:0+t:80,m:1+t:2,m:1+t:23&fields=f12&_=1579615221139'    r = requests.get(all_code_url, timeout=5).json()    return [data['f12'] for data in r['data']['diff']]# 示例:连接数据库并读取数据host = '192.168.1.128'user = 'root'password = '123456'database = 'gp_data'# 处理单个股票代码的函数def process_stock_code(stock_code, start_date, end_date):    # 连接数据库    conn = connect_to_mysql(host, user, password, database)    # 读取数据    stock_data = get_stock_data_from_mysql(conn, stock_code, start_date, end_date)    # 计算移动平均线    stock_data = calculate_moving_averages(stock_data)    # 计算rsi    stock_data = calculate_rsi(stock_data)    # 计算macd    stock_data = calculate_macd(stock_data)    # 计算布林带    stock_data = calculate_bld(stock_data)    # 计算kdj    stock_data = calculate_kdj(stock_data)    # 批量插入的数据列表    batch_data = []    # 逐日判断趋势并插入 MySQL 表    for date in stock_data.index:        # 判断成交量和还手是否放大        vol_add_rate, tr_add_rate = check_vol_tr(stock_data, date)        # 判断短期、中期和长期趋势        trend1, trend2, trend3 = check_trend(stock_data, date)        row = stock_data.loc[date]        # 准备批量插入的数据        batch_data.append((            stock_code, date, row['open'], row['close'], row['MA10'], row['MA50'], row['MA100'], row['MA200'],            int(row['vol']), row['tr'], trend1, trend2, trend3, vol_add_rate, tr_add_rate,row['RSI'],row['MACD'],            row['MACD_Signal'], row['MACD_Hist'],row['BLD_UP'],row['BLD_MID'],row['BLD_DOWN'],row['K'],row['D'],row['J']        ))    # 批量插入数据    insert_results_to_mysql_batch(conn, batch_data)    conn.close()stock_codes = get_all_code()# 设置日期范围start_date = '2024-02-20'end_date = datetime.now().strftime('%Y-%m-%d')  # 今天# end_date = '2024-12-31'# 使用多线程处理每个股票代码with ThreadPoolExecutor(max_workers=12) as executor:    executor.map(lambda code: process_stock_code(code, start_date, end_date), stock_codes)
数据库脚本CREATE TABLE `stock_indicators` (  `code` varchar(12) COLLATE utf8mb4_general_ci NOT NULL,  `dt` date NOT NULL,  `open` decimal(15,2) DEFAULT NULL COMMENT '开盘价',  `close` decimal(15,2) DEFAULT NULL COMMENT '收盘价',  `ma10` decimal(15,2) DEFAULT NULL COMMENT '10日移动平均',  `ma50` decimal(15,2) DEFAULT NULL COMMENT '50日移动平均',  `ma100` decimal(15,2) DEFAULT NULL COMMENT '100日移动平均',  `ma200` decimal(15,2) DEFAULT NULL COMMENT '200日移动平均',  `vol` bigint DEFAULT NULL COMMENT '成交量',  `tr` decimal(15,2) DEFAULT NULL COMMENT '换手率',  `trend1` int DEFAULT NULL COMMENT '短期趋势,0表示横盘,1表示上涨,2表示下跌',  `trend2` int DEFAULT NULL COMMENT '中期趋势,0表示横盘,1表示上涨,2表示下跌',  `trend3` int DEFAULT NULL COMMENT '长期趋势,0表示横盘,1表示上涨,2表示下跌',  `vol_add_rate` decimal(15,2) DEFAULT NULL COMMENT '成交量放大比例',  `tr_add_rate` decimal(15,2) DEFAULT NULL COMMENT '换手率放大比例',  `rsi` decimal(15,2) DEFAULT NULL COMMENT '相对强弱指数,RSI 在 70 以上表示超买,30 以下表示超卖',  `macd` decimal(15,2) DEFAULT NULL COMMENT 'MACD 线,当 MACD 线向上穿越 0 轴时,表示短期趋势强于长期趋势,可能是买入信号。当 MACD 线向下穿越 0 轴时,表示短期趋势弱于长期趋势,可能是卖出信号',  `macd_signal` decimal(15,2) DEFAULT NULL COMMENT 'MACD信号线,当 MACD 线向上穿越信号线时,通常被认为是买入信号。当 MACD 线向下穿越信号线时,通常被认为是卖出信号。\r\n\r\n',  `macd_hist` decimal(15,2) DEFAULT NULL COMMENT 'MACD柱状图,当柱状图为正时,表示 MACD 线在信号线之上,市场可能处于上升趋势。当柱状图为负时,表示 MACD 线在信号线之下,市场可能处于下降趋势。柱状图的放大或缩小可以反映趋势的强度变化。',  `bld_up` decimal(15,2) DEFAULT NULL COMMENT '布林带上轨',  `bld_mid` decimal(15,2) DEFAULT NULL COMMENT '布林带中轨',  `bld_down` decimal(15,2) DEFAULT NULL COMMENT '布林带下轨',  `k` decimal(15,2) DEFAULT NULL COMMENT 'KDJ指标的K值',  `d` decimal(15,2) DEFAULT NULL COMMENT 'KDJ指标的D值',  `j` decimal(15,2) DEFAULT NULL COMMENT 'KDJ指标的J值',  PRIMARY KEY (`code`,`dt`),  KEY `idx_dt` (`dt`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='指标表';总结

通过如上代码,你可以轻松计算各种技术指标,并基于这些指标实现智能选股。无论是做量化交易,还是做技术分析,这些工具都能让你事半功倍。下一篇将会讲到如何利用这些指标做智能选股!如果你觉得这篇文章有用,欢迎点赞、转发,也欢迎在评论区留言讨论!

关注我股票配资品牌有哪些,获取更多Python实战技巧!

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报。