Commit 5ca753ea authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

scalebox平台更新,增加DAG逻辑、路由转发。处理mbi一级流水线。

parent 94bc65ad
CLUSTER=csst-nao
all: reset build
run:
# PGHOST=192.168.25.27 GRPC_SERVER=192.168.25.27 scalebox app create --env-file csu.env
PGHOST=10.3.10.28:9090 scalebox app create --env-file scalebox.env
reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
down:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down
list:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) list
# 预处理消息路由模块
以python语言编写
## 流水线结构
### 接收redis-cli消息,根据DAG yaml文件分发
- .tasks.task[n]不含upstream_tasks的模块为第一个接收消息的模块
- 检索.tasks.task[n]中upstream_tasks,检索模块间链接关系,将消息下发给下级模块
### qc0
### mbi
### mbi-photmix
\ No newline at end of file
name: csst-msc-l1-mbi.app.process
label: 主巡天一级流水线(mbi)
comment: 主巡天一级流水线
cluster: csst-nao
parameters:
initial_status: RUNNING
message_router: message-router-csst
jobs:
message-router-csst:
label: 主消息路由
base_image: csst/message-router-mbi
# schedule_mode: HEAD
parameters:
tasks_per_queue: 500
#key_group_regex: ^(.+)$
#start_message: any
arguments:
max_sleep_count: 6000
slot_options: slot_on_head
environments:
- message=tar1266932744
- dataset_id=${DATASET_ID}
- star_s=${STAR_S}
- end_st=${END_ST}
- size=${SIZE}
# - PGHOST=10.3.10.28:9090
paths:
- ${CSST_DAT_YAML_PATH}:/dag-yaml
sink_vjobs:
- csst-msc-l1-qc0
- csst-msc-l1-mbi
- csst-msc-l1-mbi-photmix
csst-msc-l1-qc0:
label: QC0
base_image: cnic/csst-msc-l1-qc0
# schedule_mode: HEAD
arguments:
# always_running: yes
# reserved_on_exit: yes
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# max_sleep_count: 60000
# grpc_server: 172.24.23.6:50051
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
environments:
- CSST_DFS_API_MODE=${CSST_DFS_API_MODE}
- CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
- CSST_DFS_APP_ID=${CSST_DFS_APP_ID}
- CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN}
- CCDS_SERVER_URL=${CCDS_SERVER_URL}
# - PGHOST=10.3.10.28:9090
paths:
- ${CSST_AUX_ROOT}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfs_root
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
sink_vjobs:
- csst-msc-l1-mbi
hosts:
- h0:1
csst-msc-l1-mbi:
label: 多色成像
base_image: cnic/csst-msc-l1-mbi
# schedule_mode: HEAD
arguments:
# always_running: yes
# reserved_on_exit: yes
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# grpc_server: 10.3.10.28:50051
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
environments:
- CSST_DFS_API_MODE=${CSST_DFS_API_MODE}
- CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
- CSST_DFS_APP_ID=${CSST_DFS_APP_ID}
- CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN}
- CCDS_SERVER_URL=${CCDS_SERVER_URL}
# - PGHOST=10.3.10.28:9090
paths:
- ${CSST_AUX_ROOT}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfs_root
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
sink_vjobs:
- csst-msc-l1-mbi-photmix
hosts:
- h0:1
# - c0:10
# - c1:1
# - c2:1
csst-msc-l1-mbi-photmix:
label: 多色成像测光
base_image: cnic/csst-msc-l1-mbi-photmix
# schedule_mode: HEAD
arguments:
# always_running: yes
# reserved_on_exit: yes
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# grpc_server: 172.24.23.6:50051
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
environments:
- CSST_DFS_API_MODE=${CSST_DFS_API_MODE}
- CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
- CSST_DFS_APP_ID=${CSST_DFS_APP_ID}
- CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN}
- CCDS_SERVER_URL=${CCDS_SERVER_URL}
# - PGHOST=10.3.10.28:9090
paths:
- ${CSST_AUX_ROOT}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfs_root
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
hosts:
- h0:1
# - c0:10
# - c1:1
# - c2:1
\ No newline at end of file
FROM hub.cstcloud.cn/scalebox/agent
LABEL maintainer="Zhang Xiaoli <zhangxiaoli@cnic.cn>"
# 安装python
RUN apt-get update \
&& apt-get install -y python3 python3-pip\
&& pip3 install pyyaml --break-system-packages\
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY *.sh *.py /app/bin/
ENV WITH_HEADERS=yes
ENV ACTION_RUN=/app/bin/messageRoute.py
RUN chmod +x /app/bin/messageRoute.py
#ENV ACTION_RUN=/app/bin/run.py
#controld version 202208
#RUN chmod +x /app/bin/run.sh
IMAGE_NAME:=csst/message-router-mbi
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
push:
docker push $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
# 定义DAG的基本信息
dag_id: "csst-msc-l1-mbi"
# 定义任务
tasks:
task1:
image: csst-msc-l1-qc0
tag: latest
force_pull: true
bash_command: "python /pipeline/app/run.py {message}"
task2:
image: csst-msc-l1-mbi
tag: latest
force_pull: true
bash_command: "python /pipeline/app/run.py {message}"
upstream_tasks: ["task1"] # 定义依赖关系,task2依赖task1
task3:
image: csst-msc-l1-mbi-photmix
tag: latest
force_pull: true
bash_command: "python /pipeline/app/run.py {message}"
upstream_tasks: ["task2"] # 定义依赖关系,task3依赖task2
#!/usr/bin/python3
#-*- coding:utf-8 -*-
import sys
import json
import subprocess
import yaml
class messageRoute():
def __init__(self):
self.message = ""
self.headers = ""
def route_all(self,message,headers):
if headers =="null" or "from_job" not in headers:
print("received redis-cli message")
messageRoute.appready(message)
else:
try:
headersstr = json.loads(headers)
from_job=headersstr["from_job"]
#from_ip=headersstr["from_ip"]
print("from_job :"+from_job)
messageRoute.sendsinkjobs(from_job,message)
except json.JSONDecodeError as e:
print("Invalid JSON format in headers:", e)
@classmethod
def appready(self,message):
#解析对应的DAG文件
with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
# 打印解析后的数据
#print(config)
# 检索yaml文件中tasks下有多少模块
key_to_check = 'tasks'
subkey_count = 0
if key_to_check in config and isinstance(config[key_to_check], dict):
subkey_count = len(config[key_to_check])
print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.")
if subkey_count == 0:
print("DAG file is not correct.")
else:
for i in range(subkey_count):
subkey = "task"+str(i+1)
upstream_tasks = config['tasks'][subkey].get('upstream_tasks')
# print("subkey "+subkey)
# 不包含upstream_tasks的模块为流水线的第一个接收消息的模块
if upstream_tasks is None:
sink_job=config['tasks'][subkey].get('image')
print("The header job is "+sink_job)
messageRoute.sendmsg(sink_job,message)
@classmethod
def sendsinkjobs(self,from_job,message):
with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r') as file:
config = yaml.safe_load(file)
# 检索yaml文件中tasks下有多少模块
key_to_check = 'tasks'
subkey_count = 0
if key_to_check in config and isinstance(config[key_to_check], dict):
subkey_count = len(config[key_to_check])
print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.")
if subkey_count == 0:
print("DAG file is not correct.")
return 104 #DAG文件异常
# 检索from_job在DAG中的task标签值
for i in range(subkey_count):
subkey = "task"+str(i+1)
if(from_job == config[key_to_check][subkey].get('image')):
target_value = subkey
for i in range(subkey_count):
subkey = "task"+str(i+1)
upstream_tasks = config[key_to_check][subkey].get('upstream_tasks')
# 不包含upstream_tasks的模块为流水线的第一个接收消息的模块
if upstream_tasks is None:
continue
# 包含upstream_tasks的模块,检索其值是否含有指定模块
else:
if target_value in upstream_tasks:
sink_job=config[key_to_check][subkey].get('image')
messageRoute.sendmsg(sink_job,message)
@classmethod
def sendmsg(self,job,message):
#执行解包操作
command = f"scalebox task add -sink-job={job} {message}"
result=subprocess.run(command, shell=True)
if result.returncode == 0:
print(f"send message {message} to {job}")
print("命令执行成功")
return result.returncode
else:
print(f"命令执行失败,返回码为: {result.returncode}")
return result.returncode
if __name__ == '__main__':
parameter = sys.argv
message=parameter[1]
headers=parameter[2]
print('message '+message)
print('headers '+headers)
#如何接收到headers
w=messageRoute()
w.route_all(message,headers)
#!/bin/bash
python3 /app/bin/messageRoute.py $*
\ No newline at end of file
# DFS
CSST_DFS_GATEWAY=10.3.10.28:30880
CSST_DFS_API_MODE=cluster
CSST_DFS_APP_ID=test
CSST_DFS_APP_TOKEN=test
CSST_DFS_LOGS_DIR="."
# CCDS
CCDS_SERVER_URL=http://10.3.10.28:29000
CCDS_USER=USER
CCDS_PASS=PASS
# VOLUMES
# /dfs_root:ro
CSST_DFS_ROOT=/nfsdata/share/dfs/dfs_root
# /ccds_root:ro
CCDS_ROOT=/nfsdata/share/dfs/ccds_root
# /pipeline/aux:ro
CSST_AUX_ROOT=/nfsdata/share/pipeline-inttest/aux
# /pipeline/temp:rw
CSST_AST_TEMP=/nfsdata/share/pipeline-inttest/ast_temp
# DAG yaml /dag-yaml:ro
CSST_DAT_YAML_PATH=/nfsdata/share/dag-yaml
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment