Commit a8f310b1 authored by zxl's avatar zxl
Browse files

标准输出、标准错误及用户定制日志分别记录;修改时间戳记录方式/work/timestamps.txt

parent f405ae17
import grpc
import control_pb2_grpc
import control_pb2
import psycopg2
import os
import sys
class admL1Api():
def __init__(self):
self.body = ""
def adminLevelOne(self,body):
#获取obsid
obsid=int(str(body)[0])
#获取body
# body=body.split(",",1)[1]
shost = os.getenv('CSST_SCALEBOX_HOST')
sport = os.getenv('CSST_SCALEBOX_PORT')
suser = os.getenv('CSST_SCALEBOX_USER')
spwd = os.getenv('CSST_SCALEBOX_PWD')
sdb = os.getenv('CSST_SCALEBOX_DATABASE')
#取环境变量中模块id,需预先设定
#current_job_id=os.getenv('CSST_ADML1_APPID')
# conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox")
conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb)
cursor = conn.cursor()
#sql ="SELECT id,name FROM t_app ;"
#根据admL1的名字查找它的jobid
admsql = "SELECT id FROM t_job where name = 'admL1'"
cursor.execute(admsql)
admrows = cursor.fetchone()
current_job_id = admrows[0]
sink_job_name=""
if obsid==1:
admL1Api.sum_numbers(body,"mbi",current_job_id)
admL1Api.sum_numbers(body,"sls2d",current_job_id)
elif obsid==2:
sink_job_name="mci"
elif obsid==3:
sink_job_name="ifs"
elif obsid==4:
sink_job_name="cpic"
elif obsid==5:
sink_job_name="hstdm"
else:
sink_job_name=""
if sink_job_name:
admL1Api.sum_numbers(body,sink_job_name,current_job_id)
else:
print('等待模块传输')
print("执行完毕")
@classmethod
def sum_numbers(self,body,sink_job_name,current_job_id):
#调用grpc的SendJobMessage
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey()
#当前模块的id
test.builtin_job_id.current_job_id = int(current_job_id)
#下级模块的名字
test.builtin_job_id.sink_job_name =sink_job_name
test.key_text = body
reflag = stub.SendJobMessage(test)
print("rowname : %s" %(sink_job_name))
print("reflag : %d " %(reflag.value))
return reflag.value
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
w=admL1Api()
w.adminLevelOne(body)
\ No newline at end of file
CREATE SEQUENCE t_obs_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1;
CREATE TABLE "public"."t_obs" (
"id" integer DEFAULT nextval('t_obs_id_seq') NOT NULL,
"obs_x" integer NOT NULL,
"job_name" text NOT NULL,
CONSTRAINT "t_obs_pkey" PRIMARY KEY ("id")
) WITH (oids = false);
INSERT INTO "t_obs" ("id", "obs_x", "job_name") VALUES
(1, 1, 'mbi'),
(2, 1, 'sls2d'),
(3, 2, 'MCI'),
(4, 3, 'IFS'),
(5, 4, 'CPIC'),
(6, 5, 'HSTDM');
\ No newline at end of file
#!/bin/bash #!/bin/bash
# clear directories
#rm -rf /L1Pipeline/L0/*
#rm -rf /L1Pipeline/L1/*
# obsid 100000100 ~ 100000154 # obsid 100000100 ~ 100000154
##压力测,保留后三位,前六位替换回原格式100000 ##压力测,保留后三位,前六位替换回原格式100000
test_obsid=$1 #test_obsid=$1
obsid="100000"${test_obsid:0-3:3} #obsid="100000"${test_obsid:0-3:3}
### ###
python /L1Pipeline/build/csst_l1/app/l1_mbi_tcc.py $obsid > /work/stdout 2> /work/stderr # obsid 10160000000 - 10160000136
obsid=$1
#python /L1Pipeline/build/csst_l1/app/l1_mbi_tcc.py $obsid > /work/stdout 2> /work/stderr
l1-mbi --obs-id=$obsid --device=cpu --n-jobs=18 --n-jobs-gpu=18 --clean-l0 --clean-l1 --cleanup --dfs-node=pml --ver-sim=C6.2 --no-photometry --no-astrometry
exit_code=$? exit_code=$?
# 标准错误和标准输出,输出到文件 echo "=====csst-l1ppl.log====" > /work/user-file.txt
echo "obsid : "$obsid >> /var/log/scalebox/stdout cat /L1Pipeline/L1/csst-l1ppl.log >> /work/user-file.txt
cat /work/stdout >> /var/log/scalebox/stdout echo "======csst-l1mod.log======" >> /work/user-file.txt
cat /L1Pipeline/L1/csst-l1mod.log|tail -n 100 >> /work/user-file.txt
echo "obsid : "$obsid >> /var/log/scalebox/stderr
cat /work/stderr >> /var/log/scalebox/stderr
rm -f /work/stdout /work/stderr
#obsid=$1
#cat /L1Pipeline/L1/csst-l1ppl.log
#cat /L1Pipeline/L1/csst-l1ppl.log >> /var/log/scalebox/${obsid}.log
#创建临时文件
touch /L1Pipeline/L1/csst-l1mod-tmp.log
#取/L1Pipeline/L1/csst-l1mod.log后100行
cat /L1Pipeline/L1/csst-l1mod.log|tail -n 100 > /L1Pipeline/L1/csst-l1mod-tmp.log
# log文件记录到标准错误或标准输出
cat /L1Pipeline/L1/csst-l1ppl.log /L1Pipeline/L1/csst-l1mod-tmp.log
cat /L1Pipeline/L1/csst-l1ppl.log /L1Pipeline/L1/csst-l1mod-tmp.log >> /var/log/scalebox/${obsid}.log
rm -rf /L1Pipeline/L0/* /L1Pipeline/L1/* rm -rf /L1Pipeline/L0/* /L1Pipeline/L1/*
# do timestamps mv /tmp/timestamps.txt /work/timestamps.txt
ts=""
if [ -f "/tmp/timestamps.txt" ]; then
mapfile arr < /tmp/timestamps.txt
for e in ${arr[@]}; do
# e=${e:0:-5}
# e+="+08:00"
# e=$(echo $e|tr ',' '.')
ts+=\"${e}\"\,
done
# skip the last ','
ts=${ts:0:-1}
rm -f /tmp/timestamps.txt
cat << EOF > /work/record.json
{
"timestamps":[${ts}]
}
EOF
fi
echo finish mbi. echo finish mbi.
......
...@@ -3,7 +3,7 @@ CLUSTER=csst ...@@ -3,7 +3,7 @@ CLUSTER=csst
all: reset build all: reset build
run: run:
scalebox app create --env-file tencent.env scalebox app create --env-file zjs.env
reset: reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd - cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
......
#!/bin/bash #!/bin/bash
POSTGRES_USER=scalebox POSTGRES_USER=scalebox
...@@ -5,13 +6,11 @@ POSTGRES_HOST=localhost ...@@ -5,13 +6,11 @@ POSTGRES_HOST=localhost
POSTGRES_DB=scalebox POSTGRES_DB=scalebox
PGPORT=5432 PGPORT=5432
job_id=45 job_id=$1
for m in {100000020..100000154}; do for m in {10160000000..10160000001}; do
echo $m echo $m
psql -U ${POSTGRES_USER} -h ${POSTGRES_HOST} -d ${POSTGRES_DB} -p ${PGPORT} << EOF docker exec -t database psql -U ${POSTGRES_USER} -h ${POSTGRES_HOST} -d ${POSTGRES_DB} -p ${PGPORT} \
INSERT INTO t_task(job,key_message) VALUES(${job_id},'${m}'); -c "INSERT INTO t_task(job,key_message) VALUES(${job_id},'${m}')"
EOF
done done
...@@ -8,17 +8,25 @@ jobs: ...@@ -8,17 +8,25 @@ jobs:
mbi: mbi:
base_image: csst/mbi base_image: csst/mbi
schedule_mode: HEAD schedule_mode: HEAD
variables:
# repeated: yes
output_text_size: 100000
text_tranc_mode: TAIL
locale_mode: NONE
parameters:
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
paths: paths:
- ${AUX_DIR}:/L1Pipeline/aux - ${AUX_DIR}:/L1Pipeline/aux
- ${CRDS_DIR}:/L1Pipeline/aux/C6.1_ref_crds - ${CRDS_DIR}:/L1Pipeline/aux/C6.1_ref_crds
- ${DFS_ROOT}:/dfsroot - ${DFS_ROOT}:/dfsroot
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE% # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# sink_jobs: # sink_jobs:
# hosts: hosts:
# - h0:1 - h0:1
# - c0:1 - c0:2
# - c1:1 - c1:2
# - c2:1 - c2:2
...@@ -15,3 +15,5 @@ run: ...@@ -15,3 +15,5 @@ run:
docker run -it --entrypoint bash $(IMAGE_NAME) docker run -it --entrypoint bash $(IMAGE_NAME)
down: down:
docker stop $(IMAGE_NAME) docker stop $(IMAGE_NAME)
scp:
scp -r ./ csst-zjs:/root/csst/sls2d/
\ No newline at end of file
#!/bin/bash #!/bin/bash
# clear directories
rm -rf /L1Pipeline/L0/*
rm -rf /L1Pipeline/L1/*
# obsid 100000100 ~ 100000154 # obsid 100000100 ~ 100000154
##压力测,保留后三位,前六位替换回原格式100000 ##压力测,保留后三位,前六位替换回原格式100000
test_obsid=$1 # test_obsid=$1
obsid="100000"${test_obsid:0-3:3} # obsid="100000"${test_obsid:0-3:3}
### ###
python /L1Pipeline/build/csst_l1/app/l1_sls2d_tcc.py $obsid > /work/stdout 2> /work/stderr
# obsid 10160000000 - 10160000136
obsid=$1
#python /L1Pipeline/build/csst_l1/app/l1_sls2d_tcc.py $obsid > /work/stdout 2> /work/stderr
l1-sls2d --obs-id=$obsid --device=cpu --n-jobs=18 --n-jobs-gpu=18 --clean-l0 --clean-l1 --cleanup --dfs-node=pml --ver-sim=C6.2 --no-photometry --no-astrometry > /work/stdout 2> /work/stderr
exit_code=$? exit_code=$?
# 标准错误和标准输出,输出到文件 echo "=====csst-l1ppl.log====" > /work/user-file.txt
echo "obsid : "$obsid >> /var/log/scalebox/stdout cat /L1Pipeline/L1/csst-l1ppl.log >> /work/user-file.txt
cat /work/stdout >> /var/log/scalebox/stdout echo "======csst-l1mod.log======" >> /work/user-file.txt
cat /L1Pipeline/L1/csst-l1mod.log|tail -n 100 >> /work/user-file.txt
echo "obsid : "$obsid >> /var/log/scalebox/stderr
cat /work/stderr >> /var/log/scalebox/stderr
rm -f /work/stdout /work/stderr
#obsid=$1
#cat /L1Pipeline/L1/csst-l1ppl.log
#cat /L1Pipeline/L1/csst-l1ppl.log >> /var/log/scalebox/${obsid}.log
#创建临时文件
touch /L1Pipeline/L1/csst-l1mod-tmp.log
#取/L1Pipeline/L1/csst-l1mod.log后100行
cat /L1Pipeline/L1/csst-l1mod.log|tail -n 100 > /L1Pipeline/L1/csst-l1mod-tmp.log
# log文件记录到标准错误或标准输出
cat /L1Pipeline/L1/csst-l1ppl.log /L1Pipeline/L1/csst-l1mod-tmp.log
cat /L1Pipeline/L1/csst-l1ppl.log /L1Pipeline/L1/csst-l1mod-tmp.log >> /var/log/scalebox/${obsid}.log
rm -rf /L1Pipeline/L0/* /L1Pipeline/L1/* rm -rf /L1Pipeline/L0/* /L1Pipeline/L1/*
# do timestamps mv /tmp/timestamps.txt /work/timestamps.txt
ts=""
if [ -f "/tmp/timestamps.txt" ]; then
mapfile arr < /tmp/timestamps.txt
for e in ${arr[@]}; do
#e=${e:0:-5}
#e+="+08:00"
#e=$(echo $e|tr ',' '.')
ts+=\"${e}\"\,
done
# skip the last ','
ts=${ts:0:-1}
rm -f /tmp/timestamps.txt
cat << EOF > /work/record.json
{
"timestamps":[${ts}]
}
EOF
fi
echo finish sls2d. echo finish sls2d.
......
...@@ -3,7 +3,7 @@ CLUSTER=local ...@@ -3,7 +3,7 @@ CLUSTER=local
# all: reset build # all: reset build
run: run:
scalebox app create --env-file tencent.env scalebox app create --env-file zjs.env
reset: reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd - cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
......
#!/bin/bash
POSTGRES_USER=scalebox
POSTGRES_HOST=localhost
POSTGRES_DB=scalebox
PGPORT=5432
job_id=$1
for m in {10160000000..10160000136}; do
echo $m
docker exec -t database psql -U ${POSTGRES_USER} -h ${POSTGRES_HOST} -d ${POSTGRES_DB} -p ${PGPORT} \
-c "INSERT INTO t_task(job,key_message) VALUES(${job_id},'${m}')"
done
...@@ -9,15 +9,19 @@ jobs: ...@@ -9,15 +9,19 @@ jobs:
label: 无缝光谱2D label: 无缝光谱2D
base_image: csst/sls2d base_image: csst/sls2d
schedule_mode: HEAD schedule_mode: HEAD
# variables: variables:
# repeated: yes # repeated: yes
# parameters: output_text_size: 100000
# start_message: 100000100 text_tranc_mode: TAIL
locale_mode: NONE
parameters:
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
paths: paths:
- ${AUX_DIR}:/L1Pipeline/aux - ${AUX_DIR}:/L1Pipeline/aux
- ${CRDS_DIR}:/L1Pipeline/aux/C6.1_ref_crds - ${CRDS_DIR}:/L1Pipeline/aux/C6.1_ref_crds
- ${DFS_ROOT}:/dfsroot - ${DFS_ROOT}:/dfsroot
# sink_jobs: # sink_jobs:
# hosts: # hosts:
# - h0:1 # - h0:1
# - c0:3 # - c0:3
......
...@@ -5,21 +5,6 @@ parameters: ...@@ -5,21 +5,6 @@ parameters:
initial_status: RUNNING initial_status: RUNNING
jobs: jobs:
redis-cli:
label: 消息队列接收模块
base_image: csst/redis-cli
schedule_mode: HEAD
parameters:
start_message: abcd
sink_jobs:
- admL1
admL1:
label: 1级流水线消息网关
base_image: csst/adml1
schedule_mode: HEAD
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
sink_jobs:
- sls2d
sls2d: sls2d:
label: 无缝光谱 label: 无缝光谱
base_image: csst/sls2d base_image: csst/sls2d
......
CLUSTER=csst CLUSTER=local
all: reset parse up-app # all: reset build
run:
scalebox app create --env-file zjs.env
reset: reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd - cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
up-app:
docker exec -t database psql -Uscalebox -c "UPDATE t_app SET status='RUNNING' WHERE status='INITIAL'"
parse:
CLUSTER=$(CLUSTER) \
scalebox app create app.yaml
start:
docker run -it --rm --network host -e JOB_ID=17 -e GRPC_SERVER=localhost:50051 hub.cstcloud.cn/scalebox/base bash -c ' send-job-message filelist . '
down: down:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down
......
...@@ -8,7 +8,7 @@ PGPORT=5432 ...@@ -8,7 +8,7 @@ PGPORT=5432
job_id=$1 job_id=$1
for m in {100000088..100000123}; do for m in {100000076..100000123}; do
echo $m echo $m
docker exec -t database psql -U ${POSTGRES_USER} -h ${POSTGRES_HOST} -d ${POSTGRES_DB} -p ${PGPORT} \ docker exec -t database psql -U ${POSTGRES_USER} -h ${POSTGRES_HOST} -d ${POSTGRES_DB} -p ${PGPORT} \
-c "INSERT INTO t_task(job,key_message) VALUES(${job_id},'${m}')" -c "INSERT INTO t_task(job,key_message) VALUES(${job_id},'${m}')"
......
name: csst-1.app.process name: ${PIPELINE_NAME}.app.process
label: csst-adminL1 label: csst-${PIPELINE_NAME}
output_basedir: /data/csst-adminL1 #output_basedir: /data/${PIPELINE_NAME}
cluster: csst cluster: local
parameters:
initial_status: RUNNING
jobs: jobs:
admL1: redis-cli:
label: admin管理模块1 label: 消息队列接收模块
base_image: hub.cstcloud.cn/csst/adml1:dev base_image: csst/redis-cli
type: LONG-RUN-JOB
schedule_mode: HEAD schedule_mode: HEAD
## key_example: Dec+5841_09_04/20200817 parameters:
key_group_regex: ^(Dec.+/[0-9]+)$ start_message: abcd
# environments:
# - RSYNC_SRC=scalebox@159.226.237.136
# - ROOT_PATH=${ROOT_PATH}
# - REGEX_FILTER=^.+_000[0-7]_2bit.fits$
command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
sink_jobs: sink_jobs:
- module1 - admL1
#- module2 admL1:
module1: label: 1级流水线消息网关
label: 模块一 base_image: csst/adml1
base_image: hub.cstcloud.cn/csst/module1:dev
type: LONG-RUN-JOB
schedule_mode: HEAD schedule_mode: HEAD
## key_example: Dec+5841_09_04/20200817 variables:
key_group_regex: ^(Dec.+/[0-9]+)$ repeated: yes
# paths:
# - ${ROOT_PATH}:${ROOT_PATH}:ro
# environments:
# - RSYNC_SRC=scalebox@159.226.237.136
# - ROOT_PATH=${ROOT_PATH}
# - REGEX_FILTER=^.+_000[0-7]_2bit.fits$
command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE% command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# sink_jobs: sink_jobs:
# - admL1 - mbi
# module2: - sls2d
# label: 模块二 mbi:
# base_image: hub.cstcloud.cn/csst/module2:dev label: 多色成像
# type: LONG-RUN-JOB base_image: csst/mbi
# schedule_mode: HEAD #schedule_mode: HEAD
# ## key_example: Dec+5841_09_04/20200817 parameters:
# key_group_regex: ^(Dec.+/[0-9]+)$ key_group_regex: ^(.{6})(.{3})$
# paths: key_group_seq: 1,2
# - ${ROOT_PATH}:${ROOT_PATH}:ro paths:
# environments: - ${AUX_DIR}:/L1Pipeline/aux
# # - RSYNC_SRC=scalebox@159.226.237.136 - ${CRDS_DIR}:/L1Pipeline/aux/C6.1_ref_crds
# - ROOT_PATH=${ROOT_PATH} - ${DFS_ROOT}:/dfsroot
# # - REGEX_FILTER=^.+_000[0-7]_2bit.fits$ # command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE% # sink_jobs:
# # sink_jobs: hosts:
# # - admL1 - h0:1
- c0:1
- c1:1
- c2:1
sls2d:
label: 无缝光谱2D
base_image: csst/sls2d
#schedule_mode: HEAD
parameters:
key_group_regex: ^(.{6})(.{3})$
key_group_seq: 1,2
paths:
- ${AUX_DIR}:/L1Pipeline/aux
- ${CRDS_DIR}:/L1Pipeline/aux/C6.1_ref_crds
- ${DFS_ROOT}:/dfsroot
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# sink_jobs:
hosts:
- h0:1
- c0:1
- c1:1
- c2:1
AUX_DIR=/goosefsx/x_c60_o19xp6c1_proxy/L1Pipeline/aux
CRDS_DIR=/goosefsx/x_c60_o19xp6c1_proxy/L1Pipeline/aux/products_ref20_3hdr
DFS_ROOT=/goosefsx/x_c60_o19xp6c1_proxy/dfs_root
#PIPELINE_NAME=lever1
#PIPELINE_NAME=mbi
PIPELINE_NAME=sls2d
AUX_DIR=/sharewcl/L1Pipeline/aux
CRDS_DIR=/sharewcl/OnOrbitCal/SimData/ref_202211/products_ref20_3hdr
DFS_ROOT=/sharewcl/dfs
PIPELINE_NAME=csst_lever1
#PIPELINE_NAME=mbi
#PIPELINE_NAME=sls2d
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment