Commit d3380b13 authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

调通mbi和sls流水线

parent 2097a0a0
......@@ -17,20 +17,22 @@ class messageRoute():
print("received redis-cli message")
headersstr = json.loads(headers)
dag_run_id=headersstr["dag_run_id"]
messageRoute.appready(dag_run_id,message)
sorted_tag=headersstr["sorted_tag"]
messageRoute.appready(sorted_tag,dag_run_id,message)
else:
try:
headersstr = json.loads(headers)
from_job=headersstr["from_job"]
#from_ip=headersstr["from_ip"]
dag_run_id=headersstr["dag_run_id"]
print("from_job :"+from_job+" dag_run_id :"+dag_run_id)
messageRoute.sendsinkjobs(dag_run_id,from_job,message)
sorted_tag=headersstr["sorted_tag"]
print("from_job :"+from_job+" dag_run_id :"+dag_run_id+" sorted_tag :"+sorted_tag)
messageRoute.sendsinkjobs(sorted_tag,dag_run_id,from_job,message)
except json.JSONDecodeError as e:
print("Invalid JSON format in headers:", e)
@classmethod
def appready(self,dag_run_id,message):
def appready(self,sorted_tag,dag_run_id,message):
#解析对应的DAG文件
with open("/dag-yaml/csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
data = yaml.safe_load(f)
......@@ -40,10 +42,10 @@ class messageRoute():
print(f"任务 '{task['name']}' 没有 dependencies 字段。")
sink_job=task['image']
print("The header job is "+sink_job)
messageRoute.sendmsg(dag_run_id,sink_job,message)
messageRoute.sendmsg(sorted_tag,dag_run_id,sink_job,message)
@classmethod
def sendsinkjobs(self,dag_run_id,from_job,message):
def sendsinkjobs(self,sorted_tag,dag_run_id,from_job,message):
#解析对应的DAG文件
#from_job='csst-msc-l1-mbi'
with open("/dag-yaml/csst-msc-l1-mbi.yml", "r", encoding='utf-8') as f:
......@@ -60,7 +62,7 @@ class messageRoute():
if from_job in dependencies:
sink_job = task.get('image')
if sink_job:
messageRoute.sendmsg(dag_run_id,sink_job,message)
messageRoute.sendmsg(sorted_tag,dag_run_id,sink_job,message)
# def sendsinkjobs(self,dag_run_id,from_job,message):
# #解析对应的DAG文件
# from_job='MBI'
......@@ -75,12 +77,12 @@ class messageRoute():
# messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod
def sendmsg(self,dag_run_id,job,message):
def sendmsg(self,sorted_tag,dag_run_id,job,message):
#执行解包操作
print("sendmsg dag_run_id is "+dag_run_id)
messageRoute.append_dagrunid(dag_run_id)
message = "'"+message+"'"
command = f"scalebox task add --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}"
command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}"
print("command : "+command)
result=subprocess.run(command, shell=True)
if result.returncode == 0:
......
CLUSTER=csst-csu
all: reset build
run:
# PGHOST=192.168.25.27 GRPC_SERVER=192.168.25.27 scalebox app create --env-file csu.env
scalebox app create --env-file scalebox.env
reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
down:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down
list:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) list
# 预处理消息路由模块
以python语言编写
## 流水线结构
### 接收redis-cli消息,根据DAG yaml文件分发
- .tasks.task[n]不含upstream_tasks的模块为第一个接收消息的模块
- 检索.tasks.task[n]中upstream_tasks,检索模块间链接关系,将消息下发给下级模块
### qc0
### mbi
### mbi-photmix
\ No newline at end of file
name: csst-msc-l1-sls.apps2
label: 主巡天一级流水线(sls)
comment: 主巡天一级流水线
cluster: csst-csu
parameters:
initial_status: RUNNING
message_router: message-router-csst
jobs:
message-router-csst:
label: 主消息路由
base_image: csst/message-router-sls
# schedule_mode: HEAD
parameters:
tasks_per_queue: 500
#key_group_regex: ^(.+)$
#start_message: any
arguments:
task_timeout_seconds: 3000
max_sleep_count: 6000
slot_options: slot_on_head
environments:
- message=tar1266932744
- dataset_id=${DATASET_ID}
- star_s=${STAR_S}
- end_st=${END_ST}
- size=${SIZE}
# - PGHOST=10.3.10.28:9090
paths:
- ${CSST_DAT_YAML_PATH}:/dag-yaml
sink_vjobs:
- csst-msc-l1-sls
csst-msc-l1-sls:
label: 无缝光谱
base_image: cnic/csst-msc-l1-sls
# schedule_mode: HEAD
arguments:
# always_running: yes
# reserved_on_exit: yes
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
# grpc_server: 10.3.10.28:50051
parameters:
# start_message: 10160000068
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
environments:
- CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
- CSST_DFS_TOKEN=${CSST_DFS_TOKEN}
- CCDS_SERVER_URL=${CCDS_SERVER_URL}
# - PGHOST=10.3.10.28:9090
- TRACE=yes
paths:
- ${CSST_AUX_ROOT}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfs_root
- ${CCDS_ROOT}:/ccds_root
- ${CSST_AST_TEMP}:/pipeline/temp
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
hosts:
- h0:10
- c0:10
- c1:10
- c2:10
\ No newline at end of file
FROM hub.cstcloud.cn/scalebox/agent
LABEL maintainer="Zhang Xiaoli <zhangxiaoli@cnic.cn>"
# 安装python
RUN apt-get update \
&& apt-get install -y python3 python3-pip\
&& pip3 install pyyaml --break-system-packages\
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY *.sh *.py /app/bin/
ENV WITH_HEADERS=yes
ENV ACTION_RUN=/app/bin/messageRoute.py
RUN chmod +x /app/bin/messageRoute.py
#ENV ACTION_RUN=/app/bin/run.py
#controld version 202208
#RUN chmod +x /app/bin/run.sh
IMAGE_NAME:=csst/message-router-sls
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
push:
docker push $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
name: csst-msc-l1-sls
tasks:
- name: SLS
image: csst-msc-l1-sls
- name: EXT0
dependencies: [SLS]
image: csst-msc-l1-sls-extraction-zero
#!/usr/bin/python3
#-*- coding:utf-8 -*-
import sys
import json
import subprocess
import yaml
class messageRoute():
def __init__(self):
self.message = ""
self.headers = ""
def route_all(self,message,headers):
if headers =="null" or "from_job" not in headers:
print("received redis-cli message")
headersstr = json.loads(headers)
dag_run_id=headersstr["dag_run_id"]
sorted_tag=headersstr["sorted_tag"]
messageRoute.appready(sorted_tag,dag_run_id,message)
else:
try:
headersstr = json.loads(headers)
from_job=headersstr["from_job"]
#from_ip=headersstr["from_ip"]
dag_run_id=headersstr["dag_run_id"]
sorted_tag=headersstr["sorted_tag"]
print("from_job :"+from_job+" dag_run_id :"+dag_run_id+" sorted_tag :"+sorted_tag)
messageRoute.sendsinkjobs(sorted_tag,dag_run_id,from_job,message)
except json.JSONDecodeError as e:
print("Invalid JSON format in headers:", e)
@classmethod
def appready(self,sorted_tag,dag_run_id,message):
#解析对应的DAG文件
with open("/dag-yaml/csst-msc-l1-sls.yml", "r", encoding='utf-8') as f:
data = yaml.safe_load(f)
tasks = data.get('tasks', [])
for task in tasks:
if 'dependencies' not in task:
print(f"任务 '{task['name']}' 没有 dependencies 字段。")
sink_job=task['image']
print("The header job is "+sink_job)
messageRoute.sendmsg(sorted_tag,dag_run_id,sink_job,message)
@classmethod
def sendsinkjobs(self,sorted_tag,dag_run_id,from_job,message):
#解析对应的DAG文件
#from_job='csst-msc-l1-sls'
with open("/dag-yaml/csst-msc-l1-sls.yml", "r", encoding='utf-8') as f:
data = yaml.safe_load(f)
tasks = data.get('tasks', [])
# 先找到 from_job 对应的 image
for task in tasks:
if task.get('image') == from_job:
from_job = task.get('name')
print("The header job is "+from_job)
break
for task in tasks:
dependencies = task.get('dependencies', [])
if from_job in dependencies:
sink_job = task.get('image')
if sink_job:
messageRoute.sendmsg(sorted_tag,dag_run_id,sink_job,message)
# def sendsinkjobs(self,dag_run_id,from_job,message):
# #解析对应的DAG文件
# from_job='MBI'
# with open("/dag-yaml/csst-msc-l1-sls.yml", "r", encoding='utf-8') as f:
# data = yaml.safe_load(f)
# tasks = data.get('tasks', [])
# for task in tasks:
# dependencies = task.get('dependencies', [])
# if from_job in dependencies:
# sink_job = task.get('image')
# if sink_job:
# messageRoute.sendmsg(dag_run_id,sink_job,message)
@classmethod
def sendmsg(self,sorted_tag,dag_run_id,job,message):
#执行解包操作
print("sendmsg dag_run_id is "+dag_run_id)
messageRoute.append_dagrunid(dag_run_id)
message = "'"+message+"'"
command = f"scalebox task add --header sorted_tag={sorted_tag} --header dag_run_id={dag_run_id} --header repeatable=yes --upsert --sink-job={job} {message}"
print("command : "+command)
result=subprocess.run(command, shell=True)
if result.returncode == 0:
print(f"send message {message} to {job}")
# dag_run_id记入/work/extra-attributes.txt
print("命令执行成功")
return result.returncode
else:
print(f"命令执行失败,返回码为: {result.returncode}")
return result.returncode
@classmethod
def append_dagrunid(self,dag_run_id):
file_path='/work/extra-attributes.txt'
content_to_append=f"dag_run_id:{dag_run_id}\n"
# 以追加模式打开文件
try:
with open(file_path, 'a', encoding='utf-8') as file:
# 追加内容
file.write(content_to_append)
print(f"追加完成,dag_run_id : {dag_run_id}")
except IOError as e:
print(f"写入文件/work/extra-attributes.txt时发生错误:{e}")
if __name__ == '__main__':
parameter = sys.argv
message=parameter[1]
headers=parameter[2]
print('message '+message)
print('headers '+headers)
#如何接收到headers
w=messageRoute()
w.route_all(message,headers)
#!/bin/bash
python3 /app/bin/messageRoute.py $*
\ No newline at end of file
# DFS
CSST_DFS_GATEWAY=192.168.25.89:28000
CSST_DFS_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0
# CCDS
CCDS_SERVER_URL=http://192.168.25.89:29000
CCDS_USER=USER
CCDS_PASS=PASS
# VOLUMES
CSST_DFS_ROOT=/nfs/dfs # /dfs_root:ro
CCDS_ROOT=/nfs/ccds # /ccds_root:ro
CSST_AST_TEMP=/nfs/pipeline-inttest/ast_temp # /pipeline/temp:rw
CSST_AUX_ROOT=/nfs/pipeline-inttest/aux # /pipeline/aux:ro
# TEST
CSST_INTTEST_ROOT=/nfs/pipeline-inttest
VERBOSE=true
# DAG yaml /dag-yaml:ro
CSST_DAT_YAML_PATH=/nfs/scalebox/dag-yaml
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment