Commit 7ff66aa8 authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

remove redundant modules

parent 6dae7ea8
This diff is collapsed.
#!/bin/bash
# input=test202210/P01_N6397_030s_Astrometry-off/MSC_0000000/,test202210/P01_N6397_030s_Astrometry-off/MSC_0000000_L1/,4
input=$1
arr=($(echo $input | tr "," " "))
echo ${arr[0]}
echo ${arr[1]}
echo ${arr[2]}
python /L1Pipeline/build/csst_l1/app/l1_mbi_local.py /share/OnOrbitCal/SimData/${arr[0]} /share/OnOrbitCal/SimData/${arr[1]} ${arr[2]} > /work/stdout 2> /work/stderr
exit_code=$?
# 标准错误和标准输出,输出到文件
echo "input : " ${arr[1]} >> /var/log/scalebox/stdout
cat /work/stdout >> /var/log/scalebox/stdout
echo "input : " ${arr[1]} >> /var/log/scalebox/stderr
cat /work/stderr >> /var/log/scalebox/stderr
rm -f /work/stdout /work/stderr
#创建临时文件
touch /share/OnOrbitCal/SimData/${arr[1]}csst-l1mod-tmp.log
#取/L1Pipeline/L1/csst-l1mod.log后100行
cat /share/OnOrbitCal/SimData/${arr[1]}csst-l1mod.log|tail -n 100 > /share/OnOrbitCal/SimData/${arr[1]}csst-l1mod-tmp.log
# log文件记录到标准错误或标准输出
cat /share/OnOrbitCal/SimData/${arr[1]}csst-l1ppl.log /share/OnOrbitCal/SimData/${arr[1]}csst-l1mod-tmp.log
cat /share/OnOrbitCal/SimData/${arr[1]}csst-l1ppl.log /share/OnOrbitCal/SimData/${arr[1]}csst-l1mod-tmp.log >> /var/log/scalebox/${arr[1]}csst_l1.log
echo finish crds.
exit $exit_code
\ No newline at end of file
CLUSTER=csst
all: reset build
run:
scalebox app create
reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
down:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down
list:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) list
name: crds.app.process
label: csst-crds
cluster: csst
parameters:
initial_status: RUNNING
jobs:
# redis-cli:
# label: 消息队列接受模块
# base_image: csst/redis-cli
# schedule_mode: HEAD
# # parameters:
# # start_message: abcd
# sink_jobs:
# - crds
# admL1:
# label: 1级流水线消息网关
# base_image: csst/adml1
# schedule_mode: HEAD
# # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# sink_jobs:
# - crds
crds:
label: CRDS定标天区
base_image: csst/crds
schedule_mode: HEAD
paths:
- ${AUX_DIR}:/L1Pipeline/aux
- ${CRDS_DIR}:/L1Pipeline/aux/C6.1_ref_crds
- ${DFS_ROOT}:/dfsroot
- /sharewcl:/share
# sink_jobs:
#hosts:
# - h0:1
# - c0:3
# - c1:3
# - c2:3
AUX_DIR=/goosefsx/x_c60_o19xp6c1_proxy/L1Pipeline/aux
CRDS_DIR=/goosefsx/x_c60_o19xp6c1_proxy/L1Pipeline/aux/products_ref20_3hdr
DFS_ROOT=/goosefsx/x_c60_o19xp6c1_proxy/dfs_root
AUX_DIR=/sharewcl/pipeline/aux
CRDS_DIR=/sharewcl/OnOrbitCal/SimData/ref_202211/products_ref20_3hdr
DFS_ROOT=/sharewcl/dfs
FROM golang:1.20.3
ARG GOPROXY=https://goproxy.cn
COPY . /src/
RUN --mount=type=cache,target=/go/pkg \
cd /src && go build main.go && strip main
FROM hub.cstcloud.cn/scalebox/agent
COPY --from=0 /src/main /app/bin/cron
ENV ACTION_RUN=/app/bin/cron
ENTRYPOINT ["goagent"]
IMAGE_NAME:=hub.cstcloud.cn/scalebox/cron
build:
DOCKER_BUILDKIT=1 docker build --network=host -t $(IMAGE_NAME) .
push:
docker push $(IMAGE_NAME)
clean:
docker rmi $(IMAGE_NAME)
run:
scalebox app create
# cron module
## Introduction
The cron module is a scalebox public module similar to the cron function under UNIX, which periodically sends messages to each sub-module to start related timing operations.
## Usage
Define timing operations for multiple modules through the file cron.txt.
In the cron.txt file, lines beginning with '#' are comment lines.
Each line defines the timing operation for a module, including timing operation interval definition and module name, separated by commas.
A sample cron.txt file is as follows:
```
# comments for cron.txt
@every 1m,mod0
@every 1m30s,mod1
```
The definition part of the timing operation interval refers to:
[cron-doc](https://pkg.go.dev/github.com/robfig/cron)
name: cron.scalebox
cluster: local
parameters:
initial_status: RUNNING
jobs:
filelist:
base_image: hub.cstcloud.cn/scalebox/cron
schedule_mode: HEAD
parameters:
start_message: ANY
paths:
- ${PWD}/cron.txt:/cron.txt:ro
sink_jobs:
- mod0
- mod1
mod0:
base_image: hub.cstcloud.cn/scalebox/agent
mod1:
base_image: hub.cstcloud.cn/scalebox/agent
# comments
@every 1m,redis-cli
# @every 1m30s,mod1
\ No newline at end of file
module cron
go 1.19
require github.com/robfig/cron/v3 v3.0.1
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
package main
import (
"bufio"
"fmt"
"os"
"os/exec"
"strings"
"time"
"github.com/robfig/cron/v3"
)
type cronItem struct {
cronText string
modName string
}
func main() {
var cronItems []cronItem
file, err := os.Open("/cron.txt")
if err != nil {
fmt.Fprintf(os.Stderr, "Open file error:%v", err)
os.Exit(1)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
s := strings.TrimSpace(scanner.Text())
if len(s) > 0 && s[0] != '#' { // not comment
ss := strings.Split(s, ",")
if len(ss) != 2 {
fmt.Fprintf(os.Stderr, "format error: %s\n", s)
continue
}
cronItems = append(cronItems, cronItem{cronText: ss[0], modName: ss[1]})
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintf(os.Stderr, "scan file error:%v", err)
os.Exit(2)
}
c := cron.New()
for _, item := range cronItems {
// CAUTION: local variable needed
modName := item.modName
c.AddFunc(item.cronText, func() {
current := time.Now().Format("2006-01-02T15:04:05")
cmd := exec.Command("send-job-message", modName, current)
output, err := cmd.CombinedOutput()
if err != nil {
fmt.Fprintf(os.Stderr, "send-job-message error:%v", err)
}
fmt.Println(string(output))
})
}
c.Start()
// block main goroutine
ch := make(chan struct{})
<-ch
}
FROM csu-harbor.csst.nao:10443/csst/csst-cpic-l1-qc0:latest
USER root
# 安装jq
RUN apt-get update \
&& apt-get install -y --no-install-recommends jq \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY run.sh /app/bin/
RUN chmod +x /app/bin/run.sh
COPY --from=hub.cstcloud.cn/scalebox/agent /usr/local /usr/local
RUN mkdir /work/
RUN chown -R csst:csst /work
RUN chown -R csst:csst /pipeline
WORKDIR /work/
USER csst
ENTRYPOINT ["goagent"]
IMAGE_NAME:=csu-harbor.csst.nao:10443/cnic/csst-cpic-l1-qc0
TAG?=latest
all: build push dist
build:
docker pull csu-harbor.csst.nao:10443/csst/csst-cpic-l1-qc0:$(TAG)
docker build --network=host -t $(IMAGE_NAME) .
push:
docker push $(IMAGE_NAME)
dist:
ssh sc1 docker pull $(IMAGE_NAME)
ssh sc2 docker pull $(IMAGE_NAME)
ssh sc3 docker pull $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
scp:
scp -r ./ csst-csu:/root/csst/csst-cpic-l1-qc0/
\ No newline at end of file
#!/bin/bash
message=$1
echo "message: "$message
headers=$(echo "$2" | sed 's/\\//g')
echo "$headers" | jq '.'
echo "headers: $headers"
has_data_list=$(echo "$headers" | jq 'has("data_list")')
if [ "$has_data_list" = "true" ]; then
# 提取 data_list 字段的值
data_list=$(echo "$headers" | jq -r '.data_list')
fixed_data_list=$(echo "$data_list" | sed 's/\([a-f0-9]\{24\}\)/"\1"/g')
# 将 data_list 重新插入回 message
message=$(echo "$message" | jq --argjson dl "$fixed_data_list" '. + {data_list: $dl}')
message=$(echo "$message" | jq -c '.')
fi
cd /pipeline/output/
# python /pipeline/app/run.py $message
run -p $message
exit_code=$?
echo "=====pipeline.log====" > /work/custom-out.txt
cat /pipeline/output/pipeline.log >> /work/custom-out.txt
# echo "======module.log======" >> /work/custom-out.txt
# cat /pipeline/output/module.log|tail -n 100 >> /work/custom-out.txt
timefile=/pipeline/output/timestamp.txt
if test -f "$timefile"; then
echo "$timefile exist"
mv /pipeline/output/timestamp.txt /work/timestamps.txt
fi
rm -rf /pipeline/input/* /pipeline/output/*
headers=$2
pattern='"dag_run_id":"([^"]+)"'
if [[ $headers =~ $pattern ]]; then
dag_run_id="${BASH_REMATCH[1]}"
echo "dag_run_id: $dag_run_id"
else
# no dag_run_id in json
dag_run_id=""
fi
echo "dag_run_id:$dag_run_id" >> /work/extra-attributes.txt
if [ $exit_code -eq 0 ]; then
# scalebox task add --header dag_run_id=${dag_run_id} --header repeatable=yes --upsert ${message}
echo "$1" > ${WORK_DIR}/messages.txt
echo "finish cpic-l1-qc0."
else
echo "finish cpic-l1-qc0, exit_code: $exit_code"
fi
exit $exit_code
CLUSTER=csst
all: reset build
run:
scalebox app create --env-file zjs.env
reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
down:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down
list:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) list
#!/bin/bash
POSTGRES_USER=scalebox
POSTGRES_HOST=localhost
POSTGRES_DB=scalebox
PGPORT=5432
job_id=$1
for m in {41000000101..41000000104}; do
echo $m
docker exec -t database psql -U ${POSTGRES_USER} -h ${POSTGRES_HOST} -d ${POSTGRES_DB} -p ${PGPORT} \
-c "INSERT INTO t_task(job,key_message) VALUES(${job_id},'${m}')"
done
name: cpic-l1.app.process
label: 星冕仪
comment: 精测模块星冕仪一级流水线
cluster: csst
parameters:
initial_status: RUNNING
jobs:
cpic-l1:
label: 星冕仪
base_image: cnic/csst-cpic-l1
schedule_mode: HEAD
variables:
always_running: yes
reserved_on_exit: yes
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
parameters:
# start_message: 41000000101
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
environments:
- CSST_DFS_API_MODE=${CSST_DFS_API_MODE}
- CSST_DFS_GATEWAY=${CSST_DFS_GATEWAY}
- CSST_DFS_APP_ID=${CSST_DFS_APP_ID}
- CSST_DFS_APP_TOKEN=${CSST_DFS_APP_TOKEN}
- CRDS_SERVER_URL=${CRDS_SERVER_URL}
paths:
- ${CSST_AUX_DIR}:/pipeline/aux
- ${CSST_DFS_ROOT}:/dfsroot
- ${CSST_CRDS_REF}:/crdsref
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# sink_jobs:
# hosts:
# - h0:1
# - c0:1
# - c1:1
# - c2:1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment