Commit a9428011 authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

实现在工程云的测试,增加优先级设置

parent d671224f
name: csst-msc-l1-mbi.apps
name: csst-msc-l1-mbi.apps2
label: 主巡天一级流水线(mbi)
comment: 主巡天一级流水线
cluster: csst-csu
......@@ -29,12 +29,11 @@ jobs:
paths:
- ${CSST_DAT_YAML_PATH}:/dag-yaml
sink_vjobs:
- csst-msc-l1-qc0
- csst-msc-l1-mbi
- csst-msc-l1-mbi-photmix
csst-msc-l1-qc0:
label: QC0
base_image: cnic/csst-msc-l1-qc0
- csst-msc-l1-mbi-phot
csst-msc-l1-mbi:
label: 多色成像
base_image: cnic/csst-msc-l1-mbi
# schedule_mode: HEAD
arguments:
# always_running: yes
......@@ -42,7 +41,7 @@ jobs:
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# max_sleep_count: 60000
# grpc_server: 10.3.10.28:50051
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
......@@ -52,19 +51,23 @@ jobs:
- CSST_DFS_TOKEN=${CSST_DFS_TOKEN}
- CCDS_SERVER_URL=${CCDS_SERVER_URL}
# - PGHOST=10.3.10.28:9090
- TRACE=yes
paths:
- ${CSST_AUX_ROOT}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfs_root
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
sink_vjobs:
- csst-msc-l1-mbi
- csst-msc-l1-mbi-phot
hosts:
- h0:1
csst-msc-l1-mbi:
label: 多色成像
base_image: cnic/csst-msc-l1-mbi
- h0:1
# - c0:1
# - c1:1
# - c2:1
csst-msc-l1-mbi-phot:
label: 多色成像测光
base_image: cnic/csst-msc-l1-mbi-phot
# schedule_mode: HEAD
arguments:
# always_running: yes
......@@ -72,7 +75,7 @@ jobs:
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# grpc_server: 10.3.10.28:50051
# grpc_server: 172.24.23.6:50051
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
......@@ -82,44 +85,16 @@ jobs:
- CSST_DFS_TOKEN=${CSST_DFS_TOKEN}
- CCDS_SERVER_URL=${CCDS_SERVER_URL}
# - PGHOST=10.3.10.28:9090
- TRACE=yes
paths:
- ${CSST_AUX_ROOT}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfs_root
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# sink_vjobs:
# - csst-msc-l1-mbi-photmix
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
hosts:
- h0:1
# - n0:1
# - n1:1
# csst-msc-l1-mbi-photmix:
# label: 多色成像测光
# base_image: cnic/csst-msc-l1-mbi-photmix
# # schedule_mode: HEAD
# arguments:
# # always_running: yes
# # reserved_on_exit: yes
# output_text_size: 100000
# text_tranc_mode: TAIL
# locale_mode: NONE
# # grpc_server: 172.24.23.6:50051
# parameters:
# # start_message: 10160000068
# key_group_regex: ^(.{6})(.{3})$
# key_group_seq: 1,2
# environments:
# - CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
# - CSST_DFS_TOKEN=${CSST_DFS_TOKEN}
# - CCDS_SERVER_URL=${CCDS_SERVER_URL}
# # - PGHOST=10.3.10.28:9090
# paths:
# - ${CSST_AUX_ROOT}:/pipeline/aux
# - ${CSST_DFS_ROOT}:/dfs_root
# - ${CCDS_ROOT}:/ccds_root
# - ${CSST_AST_TEMP}:/pipeline/temp
# # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# hosts:
# - h0:1
- h0:1
# - c0:1
# - c1:1
# - c2:1
\ No newline at end of file
- dag_id: csst-msc-l1-mbi
dag:
tasks:
- name: QC0
image: csst-msc-l1-qc0
- name: MBI
dependencies: [QC0]
image: csst-msc-l1-mbi
- name: AST
dependencies: [QC0]
image: csst-msc-l1-ast
- name: SSO
dependencies: [AST]
template: csst-msc-l1-ast-sso
\ No newline at end of file
name: csst-msc-l1-mbi
tasks:
- name: MBI
image: csst-msc-l1-mbi
- name: PHOT
dependencies: [MBI]
image: csst-msc-l1-mbi-phot
\ No newline at end of file
......@@ -10,6 +10,7 @@ class messageRoute():
self.message = ""
self.headers = ""
def route_all(self,message,headers):
if headers =="null" or "from_job" not in headers:
......@@ -33,30 +34,46 @@ class messageRoute():
#解析对应的DAG文件
with open("/dag-yaml/csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
data = yaml.safe_load(f)
for item in data:
tasks = item['dag']['tasks']
for task in tasks:
if 'dependencies' not in task:
print(f"任务 '{task['name']}' 没有 dependencies 字段。")
sink_job=task['image']
print("The header job is "+sink_job)
messageRoute.sendmsg(dag_run_id,sink_job,message)
tasks = data.get('tasks', [])
for task in tasks:
if 'dependencies' not in task:
print(f"任务 '{task['name']}' 没有 dependencies 字段。")
sink_job=task['image']
print("The header job is "+sink_job)
messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod
def sendsinkjobs(self,dag_run_id,from_job,message):
#解析对应的DAG文件
from_job='QC0'
with open("csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
#from_job='csst-msc-l1-mbi'
with open("/dag-yaml/csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
data = yaml.safe_load(f)
for item in data:
tasks = item.get('dag', {}).get('tasks', [])
for task in tasks:
dependencies = task.get('dependencies', [])
if from_job in dependencies:
sink_job = task.get('image')
if sink_job:
messageRoute.sendmsg(dag_run_id,sink_job,message)
tasks = data.get('tasks', [])
# 先找到 from_job 对应的 image
for task in tasks:
if task.get('image') == from_job:
from_job = task.get('name')
print("The header job is "+from_job)
break
for task in tasks:
dependencies = task.get('dependencies', [])
if from_job in dependencies:
sink_job = task.get('image')
if sink_job:
messageRoute.sendmsg(dag_run_id,sink_job,message)
# def sendsinkjobs(self,dag_run_id,from_job,message):
# #解析对应的DAG文件
# from_job='MBI'
# with open("/dag-yaml/csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
# data = yaml.safe_load(f)
# tasks = data.get('tasks', [])
# for task in tasks:
# dependencies = task.get('dependencies', [])
# if from_job in dependencies:
# sink_job = task.get('image')
# if sink_job:
# messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod
def sendmsg(self,dag_run_id,job,message):
#执行解包操作
......@@ -86,7 +103,7 @@ class messageRoute():
file.write(content_to_append)
print(f"追加完成,dag_run_id : {dag_run_id}")
except IOError as e:
print(f"写入文件/work/extra-attributes.txt时发生错误{e}")
print(f"写入文件/work/extra-attributes.txt时发生错误:{e}")
if __name__ == '__main__':
parameter = sys.argv
......
......@@ -8,8 +8,8 @@ CCDS_USER=USER
CCDS_PASS=PASS
# VOLUMES
CSST_DFS_ROOT=/nfs/dfs/dfs_root # /dfs_root:ro
CCDS_ROOT=/nfs/dfs/ccds_root # /ccds_root:ro
CSST_DFS_ROOT=/nfs/dfs # /dfs_root:ro
CCDS_ROOT=/nfs/ccds # /ccds_root:ro
CSST_AST_TEMP=/nfs/pipeline-inttest/ast_temp # /pipeline/temp:rw
CSST_AUX_ROOT=/nfs/pipeline-inttest/aux # /pipeline/aux:ro
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment