diff --git a/L1/pipelines/csst-msc-l1-mbi/Makefile b/L1/pipelines/csst-msc-l1-mbi/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..11faae9943755571d33a602844ca53f92a409145 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/Makefile @@ -0,0 +1,16 @@ +CLUSTER=csst-nao + +all: reset build + +run: +# PGHOST=192.168.25.27 GRPC_SERVER=192.168.25.27 scalebox app create --env-file csu.env + PGHOST=10.3.10.28:9090 scalebox app create --env-file scalebox.env +reset: + cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd - + +down: + make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down + +list: + make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) list + diff --git a/L1/pipelines/csst-msc-l1-mbi/README.md b/L1/pipelines/csst-msc-l1-mbi/README.md new file mode 100644 index 0000000000000000000000000000000000000000..89baa3f304a2eb56a2f8bf186638015674bd6403 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/README.md @@ -0,0 +1,16 @@ +# 预处理消息路由模块 + +以python语言编写 + +## 流水线结构 + +### 接收redis-cli消息,根据DAG yaml文件分发 + + - .tasks.task[n]不含upstream_tasks的模块为第一个接收消息的模块 + - 检索.tasks.task[n]中upstream_tasks,检索模块间链接关系,将消息下发给下级模块 + +### qc0 + +### mbi + +### mbi-photmix \ No newline at end of file diff --git a/L1/pipelines/csst-msc-l1-mbi/app.yaml b/L1/pipelines/csst-msc-l1-mbi/app.yaml new file mode 100644 index 0000000000000000000000000000000000000000..eae54573dec75263a06ed45f2bfcb078d42e0f93 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/app.yaml @@ -0,0 +1,135 @@ +name: csst-msc-l1-mbi.app.process +label: 主巡天一级流水线(mbi) +comment: 主巡天一级流水线 +cluster: csst-nao +parameters: + initial_status: RUNNING + message_router: message-router-csst + +jobs: + message-router-csst: + label: 主消息路由 + base_image: csst/message-router-mbi + # schedule_mode: HEAD + parameters: + tasks_per_queue: 500 + #key_group_regex: ^(.+)$ + #start_message: any + arguments: + max_sleep_count: 6000 + slot_options: slot_on_head + environments: + - message=tar1266932744 + - dataset_id=${DATASET_ID} + - star_s=${STAR_S} + - end_st=${END_ST} + - size=${SIZE} + # - PGHOST=10.3.10.28:9090 + paths: + - ${CSST_DAT_YAML_PATH}:/dag-yaml + sink_vjobs: + - csst-msc-l1-qc0 + - csst-msc-l1-mbi + - csst-msc-l1-mbi-photmix + csst-msc-l1-qc0: + label: QC0 + base_image: cnic/csst-msc-l1-qc0 + # schedule_mode: HEAD + arguments: + # always_running: yes + # reserved_on_exit: yes + output_text_size: 100000 + text_tranc_mode: TAIL + locale_mode: NONE + # max_sleep_count: 60000 + # grpc_server: 172.24.23.6:50051 + parameters: + # start_message: 10160000068 + key_group_regex: ^(.{6})(.{3})$ + key_group_seq: 1,2 + environments: + - CSST_DFS_API_MODE=${CSST_DFS_API_MODE} + - CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY} + - CSST_DFS_APP_ID=${CSST_DFS_APP_ID} + - CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN} + - CCDS_SERVER_URL=${CCDS_SERVER_URL} + # - PGHOST=10.3.10.28:9090 + paths: + - ${CSST_AUX_ROOT}:/pipeline/aux + - ${CSST_DFS_ROOT}:/dfs_root + - ${CCDS_ROOT}:/ccds_root + - ${CSST_AST_TEMP}:/pipeline/temp + # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE% + sink_vjobs: + - csst-msc-l1-mbi + hosts: + - h0:1 + csst-msc-l1-mbi: + label: 多色成像 + base_image: cnic/csst-msc-l1-mbi + # schedule_mode: HEAD + arguments: + # always_running: yes + # reserved_on_exit: yes + output_text_size: 100000 + text_tranc_mode: TAIL + locale_mode: NONE + # grpc_server: 10.3.10.28:50051 + parameters: + # start_message: 10160000068 + key_group_regex: ^(.{6})(.{3})$ + key_group_seq: 1,2 + environments: + - CSST_DFS_API_MODE=${CSST_DFS_API_MODE} + - CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY} + - CSST_DFS_APP_ID=${CSST_DFS_APP_ID} + - CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN} + - CCDS_SERVER_URL=${CCDS_SERVER_URL} + # - PGHOST=10.3.10.28:9090 + paths: + - ${CSST_AUX_ROOT}:/pipeline/aux + - ${CSST_DFS_ROOT}:/dfs_root + - ${CCDS_ROOT}:/ccds_root + - ${CSST_AST_TEMP}:/pipeline/temp + # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE% + sink_vjobs: + - csst-msc-l1-mbi-photmix + hosts: + - h0:1 + # - c0:10 + # - c1:1 + # - c2:1 + csst-msc-l1-mbi-photmix: + label: 多色成像测光 + base_image: cnic/csst-msc-l1-mbi-photmix + # schedule_mode: HEAD + arguments: + # always_running: yes + # reserved_on_exit: yes + output_text_size: 100000 + text_tranc_mode: TAIL + locale_mode: NONE + # grpc_server: 172.24.23.6:50051 + parameters: + # start_message: 10160000068 + key_group_regex: ^(.{6})(.{3})$ + key_group_seq: 1,2 + environments: + - CSST_DFS_API_MODE=${CSST_DFS_API_MODE} + - CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY} + - CSST_DFS_APP_ID=${CSST_DFS_APP_ID} + - CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN} + - CCDS_SERVER_URL=${CCDS_SERVER_URL} + # - PGHOST=10.3.10.28:9090 + paths: + - ${CSST_AUX_ROOT}:/pipeline/aux + - ${CSST_DFS_ROOT}:/dfs_root + - ${CCDS_ROOT}:/ccds_root + - ${CSST_AST_TEMP}:/pipeline/temp + # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE% + hosts: + - h0:1 + # - c0:10 + # - c1:1 + # - c2:1 + \ No newline at end of file diff --git a/L1/pipelines/csst-msc-l1-mbi/message-router/Dockerfile b/L1/pipelines/csst-msc-l1-mbi/message-router/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..320c98c577396cf8cbb9cc360002fd2412e75813 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/message-router/Dockerfile @@ -0,0 +1,18 @@ +FROM hub.cstcloud.cn/scalebox/agent + +LABEL maintainer="Zhang Xiaoli " + +# 安装python +RUN apt-get update \ + && apt-get install -y python3 python3-pip\ + && pip3 install pyyaml --break-system-packages\ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +COPY *.sh *.py /app/bin/ +ENV WITH_HEADERS=yes +ENV ACTION_RUN=/app/bin/messageRoute.py +RUN chmod +x /app/bin/messageRoute.py +#ENV ACTION_RUN=/app/bin/run.py +#controld version 202208 +#RUN chmod +x /app/bin/run.sh diff --git a/L1/pipelines/csst-msc-l1-mbi/message-router/Makefile b/L1/pipelines/csst-msc-l1-mbi/message-router/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..e05e2f562eddee3b00019fd113aefc04ce92e6b9 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/message-router/Makefile @@ -0,0 +1,14 @@ +IMAGE_NAME:=csst/message-router-mbi + +build: + docker build --network=host -t $(IMAGE_NAME) . + +dist: + docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load' + +push: + docker push $(IMAGE_NAME) +run: + docker run -it --entrypoint bash $(IMAGE_NAME) +down: + docker stop $(IMAGE_NAME) diff --git a/L1/pipelines/csst-msc-l1-mbi/message-router/dag-yaml/csst-msc-l1-mbi.yml b/L1/pipelines/csst-msc-l1-mbi/message-router/dag-yaml/csst-msc-l1-mbi.yml new file mode 100644 index 0000000000000000000000000000000000000000..46b269be4a8740d5248dd41416e3be4789012180 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/message-router/dag-yaml/csst-msc-l1-mbi.yml @@ -0,0 +1,22 @@ +# 定义DAG的基本信息 +dag_id: "csst-msc-l1-mbi" + +# 定义任务 +tasks: + task1: + image: csst-msc-l1-qc0 + tag: latest + force_pull: true + bash_command: "python /pipeline/app/run.py {message}" + task2: + image: csst-msc-l1-mbi + tag: latest + force_pull: true + bash_command: "python /pipeline/app/run.py {message}" + upstream_tasks: ["task1"] # 定义依赖关系,task2依赖task1 + task3: + image: csst-msc-l1-mbi-photmix + tag: latest + force_pull: true + bash_command: "python /pipeline/app/run.py {message}" + upstream_tasks: ["task2"] # 定义依赖关系,task3依赖task2 diff --git a/L1/pipelines/csst-msc-l1-mbi/message-router/messageRoute.py b/L1/pipelines/csst-msc-l1-mbi/message-router/messageRoute.py new file mode 100644 index 0000000000000000000000000000000000000000..9563bb506d9dd7c99721e36145d77653a0fe2fde --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/message-router/messageRoute.py @@ -0,0 +1,133 @@ +#!/usr/bin/python3 +#-*- coding:utf-8 -*- + +import sys +import json +import subprocess +import yaml +class messageRoute(): + def __init__(self): + self.message = "" + self.headers = "" + + def route_all(self,message,headers): + + if headers =="null" or "from_job" not in headers: + print("received redis-cli message") + messageRoute.appready(message) + else: + try: + headersstr = json.loads(headers) + from_job=headersstr["from_job"] + #from_ip=headersstr["from_ip"] + print("from_job :"+from_job) + messageRoute.sendsinkjobs(from_job,message) + + except json.JSONDecodeError as e: + print("Invalid JSON format in headers:", e) + @classmethod + def appready(self,message): + #解析对应的DAG文件 + with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r', encoding='utf-8') as file: + config = yaml.safe_load(file) + # 打印解析后的数据 + #print(config) + + # 检索yaml文件中tasks下有多少模块 + key_to_check = 'tasks' + subkey_count = 0 + if key_to_check in config and isinstance(config[key_to_check], dict): + subkey_count = len(config[key_to_check]) + + print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.") + + if subkey_count == 0: + print("DAG file is not correct.") + else: + for i in range(subkey_count): + + subkey = "task"+str(i+1) + upstream_tasks = config['tasks'][subkey].get('upstream_tasks') + + # print("subkey "+subkey) + + # 不包含upstream_tasks的模块为流水线的第一个接收消息的模块 + if upstream_tasks is None: + sink_job=config['tasks'][subkey].get('image') + print("The header job is "+sink_job) + messageRoute.sendmsg(sink_job,message) + + @classmethod + def sendsinkjobs(self,from_job,message): + with open('/dag-yaml/csst-msc-l1-mbi.yml', 'r') as file: + config = yaml.safe_load(file) + + # 检索yaml文件中tasks下有多少模块 + key_to_check = 'tasks' + subkey_count = 0 + if key_to_check in config and isinstance(config[key_to_check], dict): + subkey_count = len(config[key_to_check]) + + print("The key "+key_to_check+" has "+str(subkey_count)+" subkeys.") + + if subkey_count == 0: + print("DAG file is not correct.") + return 104 #DAG文件异常 + + # 检索from_job在DAG中的task标签值 + for i in range(subkey_count): + subkey = "task"+str(i+1) + if(from_job == config[key_to_check][subkey].get('image')): + target_value = subkey + + for i in range(subkey_count): + + subkey = "task"+str(i+1) + upstream_tasks = config[key_to_check][subkey].get('upstream_tasks') + + # 不包含upstream_tasks的模块为流水线的第一个接收消息的模块 + if upstream_tasks is None: + continue + # 包含upstream_tasks的模块,检索其值是否含有指定模块 + else: + if target_value in upstream_tasks: + sink_job=config[key_to_check][subkey].get('image') + messageRoute.sendmsg(sink_job,message) + + @classmethod + def sendmsg(self,job,message): + #执行解包操作 + command = f"scalebox task add -sink-job={job} {message}" + result=subprocess.run(command, shell=True) + if result.returncode == 0: + print(f"send message {message} to {job}") + print("命令执行成功") + return result.returncode + else: + print(f"命令执行失败,返回码为: {result.returncode}") + return result.returncode + +if __name__ == '__main__': + parameter = sys.argv + message=parameter[1] + headers=parameter[2] + print('message '+message) + print('headers '+headers) + #如何接收到headers + w=messageRoute() + w.route_all(message,headers) + + + + + + + + + + + + + + + diff --git a/L1/pipelines/csst-msc-l1-mbi/message-router/run.sh b/L1/pipelines/csst-msc-l1-mbi/message-router/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..64d8a754fc57da3d3814ecb8b275215afbf927d6 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/message-router/run.sh @@ -0,0 +1,2 @@ +#!/bin/bash +python3 /app/bin/messageRoute.py $* \ No newline at end of file diff --git a/L1/pipelines/csst-msc-l1-mbi/scalebox.env b/L1/pipelines/csst-msc-l1-mbi/scalebox.env new file mode 100644 index 0000000000000000000000000000000000000000..bc56a11b18f9fa9798d8149214f7e453ccb5c416 --- /dev/null +++ b/L1/pipelines/csst-msc-l1-mbi/scalebox.env @@ -0,0 +1,24 @@ +# DFS +CSST_DFS_GATEWAY=10.3.10.28:30880 +CSST_DFS_API_MODE=cluster +CSST_DFS_APP_ID=test +CSST_DFS_APP_TOKEN=test +CSST_DFS_LOGS_DIR="." + +# CCDS +CCDS_SERVER_URL=http://10.3.10.28:29000 +CCDS_USER=USER +CCDS_PASS=PASS + +# VOLUMES +# /dfs_root:ro +CSST_DFS_ROOT=/nfsdata/share/dfs/dfs_root +# /ccds_root:ro +CCDS_ROOT=/nfsdata/share/dfs/ccds_root +# /pipeline/aux:ro +CSST_AUX_ROOT=/nfsdata/share/pipeline-inttest/aux +# /pipeline/temp:rw +CSST_AST_TEMP=/nfsdata/share/pipeline-inttest/ast_temp + +# DAG yaml /dag-yaml:ro +CSST_DAT_YAML_PATH=/nfsdata/share/dag-yaml