Commit 33f17cdc authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

dag-yaml/csst-msc-l1-mbi.yml文件格式修改,相应调整代码

parent 8ef3864a
CLUSTER=csst-nao
CLUSTER=csst-csu
all: reset build
run:
# PGHOST=192.168.25.27 GRPC_SERVER=192.168.25.27 scalebox app create --env-file csu.env
PGHOST=10.3.10.28:9090 scalebox app create --env-file scalebox.env
scalebox app create --env-file scalebox.env
reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
......
name: csst-msc-l1-mbi.apps
label: 主巡天一级流水线(mbi)
comment: 主巡天一级流水线
cluster: csst-zj
cluster: csst-csu
parameters:
initial_status: RUNNING
message_router: message-router-csst
......@@ -42,8 +42,7 @@ jobs:
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# max_sleep_count: 60000
# grpc_server: 172.24.23.6:50051
# max_sleep_count: 60000
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
......@@ -89,44 +88,38 @@ jobs:
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
sink_vjobs:
- csst-msc-l1-mbi-photmix
# sink_vjobs:
# - csst-msc-l1-mbi-photmix
hosts:
- h0:1
# - n0:1
# - n1:1
csst-msc-l1-mbi-photmix:
label: 多色成像测光
base_image: cnic/csst-msc-l1-mbi-photmix
# schedule_mode: HEAD
arguments:
# always_running: yes
# reserved_on_exit: yes
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# grpc_server: 172.24.23.6:50051
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
environments:
# - CSST_DFS_API_MODE=${CSST_DFS_API_MODE}
# - CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
# - CSST_DFS_APP_ID=${CSST_DFS_APP_ID}
# - CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN}
- CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
- CSST_DFS_TOKEN=${CSST_DFS_TOKEN}
- CCDS_SERVER_URL=${CCDS_SERVER_URL}
# - PGHOST=10.3.10.28:9090
paths:
- ${CSST_AUX_ROOT}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfs_root
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
hosts:
- h0:1
# - n0:1
# - n1:1
# csst-msc-l1-mbi-photmix:
# label: 多色成像测光
# base_image: cnic/csst-msc-l1-mbi-photmix
# # schedule_mode: HEAD
# arguments:
# # always_running: yes
# # reserved_on_exit: yes
# output_text_size: 100000
# text_tranc_mode: TAIL
# locale_mode: NONE
# # grpc_server: 172.24.23.6:50051
# parameters:
# # start_message: 10160000068
# key_group_regex: ^(.{6})(.{3})$
# key_group_seq: 1,2
# environments:
# - CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
# - CSST_DFS_TOKEN=${CSST_DFS_TOKEN}
# - CCDS_SERVER_URL=${CCDS_SERVER_URL}
# # - PGHOST=10.3.10.28:9090
# paths:
# - ${CSST_AUX_ROOT}:/pipeline/aux
# - ${CSST_DFS_ROOT}:/dfs_root
# - ${CCDS_ROOT}:/ccds_root
# - ${CSST_AST_TEMP}:/pipeline/temp
# # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# hosts:
# - h0:1
\ No newline at end of file
# 定义DAG的基本信息
dag_id: "csst-msc-l1-mbi"
# 定义任务
tasks:
task1:
image: csst-msc-l1-qc0
tag: latest
force_pull: true
bash_command: "python /pipeline/app/run.py {message}"
task2:
image: csst-msc-l1-mbi
tag: latest
force_pull: true
bash_command: "python /pipeline/app/run.py {message}"
upstream_tasks: ["task1"] # 定义依赖关系,task2依赖task1
task3:
image: csst-msc-l1-mbi-photmix
tag: latest
force_pull: true
bash_command: "python /pipeline/app/run.py {message}"
upstream_tasks: ["task2"] # 定义依赖关系,task3依赖task2
- dag_id: csst-msc-l1-mbi
dag:
tasks:
- name: QC0
image: csst-msc-l1-qc0
- name: MBI
dependencies: [QC0]
image: csst-msc-l1-mbi
- name: AST
dependencies: [QC0]
image: csst-msc-l1-ast
- name: SSO
dependencies: [AST]
template: csst-msc-l1-ast-sso
\ No newline at end of file
......@@ -31,77 +31,38 @@ class messageRoute():
@classmethod
def appready(self,dag_run_id,message):
#解析对应的DAG文件
with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
# 打印解析后的数据
#print(config)
# 检索yaml文件中tasks下有多少模块
key_to_check = 'tasks'
subkey_count = 0
if key_to_check in config and isinstance(config[key_to_check], dict):
subkey_count = len(config[key_to_check])
print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.")
if subkey_count == 0:
print("DAG file is not correct.")
else:
for i in range(subkey_count):
subkey = "task"+str(i+1)
upstream_tasks = config['tasks'][subkey].get('upstream_tasks')
# 不包含upstream_tasks的模块为流水线的第一个接收消息的模块
if upstream_tasks is None:
sink_job=config['tasks'][subkey].get('image')
print("The header job is "+sink_job)
messageRoute.sendmsg(dag_run_id,sink_job,message)
with open("/dag-yaml/csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
data = yaml.safe_load(f)
for item in data:
tasks = item['dag']['tasks']
for task in tasks:
if 'dependencies' not in task:
print(f"任务 '{task['name']}' 没有 dependencies 字段。")
sink_job=task['image']
print("The header job is "+sink_job)
messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod
def sendsinkjobs(self,dag_run_id,from_job,message):
with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r') as file:
config = yaml.safe_load(file)
# 检索yaml文件中tasks下有多少模块
key_to_check = 'tasks'
subkey_count = 0
if key_to_check in config and isinstance(config[key_to_check], dict):
subkey_count = len(config[key_to_check])
print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys. dag_run_id is "+dag_run_id)
if subkey_count == 0:
print("DAG file is not correct.")
return 104 #DAG文件异常
# 检索from_job在DAG中的task标签值
for i in range(subkey_count):
subkey = "task"+str(i+1)
if(from_job == config[key_to_check][subkey].get('image')):
target_value = subkey
for i in range(subkey_count):
subkey = "task"+str(i+1)
upstream_tasks = config[key_to_check][subkey].get('upstream_tasks')
# 不包含upstream_tasks的模块为流水线的第一个接收消息的模块
if upstream_tasks is None:
continue
# 包含upstream_tasks的模块,检索其值是否含有指定模块
else:
if target_value in upstream_tasks:
sink_job=config[key_to_check][subkey].get('image')
messageRoute.sendmsg(dag_run_id,sink_job,message)
#解析对应的DAG文件
from_job='QC0'
with open("csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
data = yaml.safe_load(f)
for item in data:
tasks = item.get('dag', {}).get('tasks', [])
for task in tasks:
dependencies = task.get('dependencies', [])
if from_job in dependencies:
sink_job = task.get('image')
if sink_job:
messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod
def sendmsg(self,dag_run_id,job,message):
#执行解包操作
print("sendmsg dag_run_id is "+dag_run_id)
messageRoute.append_dagrunid(dag_run_id)
message = "'"+message+"'"
command = f"scalebox task add --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}"
print("command : "+command)
result=subprocess.run(command, shell=True)
......@@ -136,18 +97,3 @@ if __name__ == '__main__':
#如何接收到headers
w=messageRoute()
w.route_all(message,headers)
# DFS
CSST_DFS_GATEWAY=172.24.232.11:28000
CSST_DFS_GATEWAY=192.168.25.89:28000
CSST_DFS_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0
# CCDS
CCDS_SERVER_URL=http://172.24.232.11:29000
CCDS_SERVER_URL=http://192.168.25.89:29000
CCDS_USER=USER
CCDS_PASS=PASS
# VOLUMES
# /dfs_root:ro
CSST_DFS_ROOT=/data/dfs/files
# /ccds_root:ro
CCDS_ROOT=/data/ccds/files
# /pipeline/temp:rw
CSST_AST_TEMP=/data/ast_temp
# HARBOR
HARBOR=zjlab-harbor.csst.nao
CSST_DFS_ROOT=/nfs/dfs/dfs_root # /dfs_root:ro
CCDS_ROOT=/nfs/dfs/ccds_root # /ccds_root:ro
CSST_AST_TEMP=/nfs/pipeline-inttest/ast_temp # /pipeline/temp:rw
CSST_AUX_ROOT=/nfs/pipeline-inttest/aux # /pipeline/aux:ro
# TEST
CSST_INTTEST_ROOT=/data/pipeline-inttest
CSST_INTTEST_ROOT=/nfs/pipeline-inttest
VERBOSE=true
# DAG yaml /dag-yaml:ro
CSST_DAT_YAML_PATH=/data/scalebox/dag-yaml
CSST_DAT_YAML_PATH=/nfs/scalebox/dag-yaml
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment