Commit 6b2e615d authored by zxl's avatar zxl
Browse files

CSST Pyscalebox

parents
Pipeline #20 canceled with stages
FROM hub.cstcloud.cn/scalebox/agent:dev
LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>"
# 安装python3
RUN apt-get update \
&& apt-get install -y python3 python3-pip \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
#安装第三方包 grpcio、protobuf、grpcio_tools
RUN pip install grpcio
RUN pip install protobuf
RUN pip install grpcio_tools
ENV CSST_ADML2_JOB_ID=2
ENV CSST_ADML2_NAME="admL2"
ENV CSST_ADML1_JOB_ID=3
ENV CSST_ADML1_NAME="admL1"
# grpc连接参数
ENV CSST_PIPELINE_GRPC_SERVER="122.96.144.152:39092"
# ENV CSST_PIPELINE_GRPC_SERVER="10.255.2.11:50051"
COPY *.sh /app/bin/
COPY *.py /app/bin/
COPY *.proto /app/bin/
#controld version 202208
RUN cd /app/bin/ \
&& mkdir controld \
&& python3 -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. control.proto -I=control
RUN cd /app/bin/ \
&& python3 setup.py sdist \
&& python3 setup.py bdist_wheel \
&& pip install twine \
# && twine upload dist/* \
\ No newline at end of file
IMAGE_NAME:=hub.cstcloud.cn/csst/pyscalebox:dev
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
push:
docker push $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
# pyscalebox APIs library
## Installation
`pyscalebox` can be installed with the following command:
```bash
git clone
cd pyscalebox
pip install -r requirements.txt
python setup.py install
```
## Configuration
- CSST_PIPELINE_GRPC_SERVER = scalebox plat server # default: 122.96.144.152:39092
- CSST_ADML2_JOB_ID = Admin level2 ID # default: 2
\ No newline at end of file
syntax = "proto3";
package scalebox;
option go_package="./;scalebox";
option java_multiple_files = true;
// option java_outer_classname = "ControlProto";
option java_package = "cn.scalebox.gprc";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
service ControlService {
// //////////////////////////////////////////////////////////////////
// agent client
// //////////////////////////////////////////////////////////////////
// input : slot id('ON')
// return : job-key of the task, id==0(NULL)
// task id : 'READY/-1' -> 'QUEUED/-2'
rpc GetNextTask(google.protobuf.Int32Value) returns (TaskItem);
// input : task id('QUEUED/-2')
// return : ret_code : 0(OK), -1(task NOT FOUND), -2(WRONG STATUS)
// task_status_code:'QUEUED'/-2 -> 'RUNNING'/-3
rpc SetTaskStarted(google.protobuf.Int64Value) returns (google.protobuf.Int32Value);
// input : TaskExecMessage
// return : ret_code : 0(OK), -1(task NOT FOUND), -2(task WRONG STATUS)
// task_status_code:'RUNNING'/-3 -> 'OK'/0, ...
rpc SetTaskFinished(TaskExecMessage) returns (google.protobuf.Int32Value);
// slot exit automatically, called in agent side
// input : slot id
// return : ret_code : 0(OK), -1(slot NOT FOUND), -2(slot WRONG STATUS)
// slot : 'ON' -> 'OFF'
rpc SetSlotTerminated(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// //////////////////////////////////////////////////////////////////
// actuator client
// //////////////////////////////////////////////////////////////////
// input: token_string ?
// job:'RUNNING' && slot:'READY'
rpc GetRunnableSlotList(google.protobuf.Empty) returns (CommandList);
// input : slot id
// return : ret_code : 0(OK), -1(slot NOT FOUND), -2(slot WRONG STATUS)
// slot: 'READY' -> 'ON'
rpc SetSlotInitialized(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// job:'PAUSED'/'ARCHIVED' && slot:'ON'
rpc GetTerminableSlotList(google.protobuf.Empty) returns (CommandList);
// job:'RUNNING' && worker:'PAUSED'
rpc GetRunnableWorkerList(google.protobuf.Empty) returns (CommandList);
// input : worker id
// return : ret_code : 0(OK), -1(worker NOT FOUND), -2(worker WRONG STATUS)
// worker: 'PAUSED' -> 'RUNNING'
rpc SetWorkerInitialized(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// job:'PAUSED' && worker:'RUNNING'
rpc GetTerminableWorkerList(google.protobuf.Empty) returns (CommandList);
// input : worker id, return ret_code;
// return : ret_code : 0(OK), -1(worker NOT FOUND), -2(worker WRONG STATUS)
// worker : 'RUNNING' -> 'PAUSED'
rpc SetWorkerTerminated(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// //////////////////////////////////////////////////////////////////
// app client , called by user app.
// //////////////////////////////////////////////////////////////////
// send task-key to next job in current pipeline
// return : ret_code : 0(OK), -1(job NOT FOUND)
// task_status_code : 'INITIAL'/-9
rpc SendJobMessage(JobKey) returns (google.protobuf.Int32Value);
// rpc SendToNextJob(JobKey) returns (google.protobuf.Int32Value);
rpc SendJobMessages(JobKeys) returns (google.protobuf.Int32Value);
}
message JobKey {
message JobIdRef{
// qualified name of job
string sink_job_name=1;
int32 current_job_id=2;
}
oneof jobId {
int32 cross_app_job_id=1;
JobIdRef builtin_job_id=2;
}
string key_text=10;
}
message JobKeys {
// qualified name of job
string next_job_name=1;
repeated string key_texts=2;
int32 current_job_id=3;
// OR worker_id ?
}
message TaskItem {
int64 id=1;
string key=2;
}
message Command {
// primary key of slot/worker table
int32 id=1;
string host=2;
string command_text=3;
}
message CommandList {
repeated Command command=1;
}
message TaskExecMessage {
int32 slot=1;
int32 status_code=2;
int64 task_id=3;
int64 input_bytes=4;
int64 output_bytes=5;
google.protobuf.Timestamp t0=6;
google.protobuf.Timestamp t1=7;
google.protobuf.Timestamp t2=8;
google.protobuf.Timestamp t3=9;
google.protobuf.Timestamp t4=10;
repeated google.protobuf.Timestamp time_arr=11;
string sys_out=12;
string app_out=13;
}
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: control.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rcontrol.proto\x12\x08scalebox\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1egoogle/protobuf/wrappers.proto\"\xaf\x01\n\x06JobKey\x12\x1a\n\x10\x63ross_app_job_id\x18\x01 \x01(\x05H\x00\x12\x33\n\x0e\x62uiltin_job_id\x18\x02 \x01(\x0b\x32\x19.scalebox.JobKey.JobIdRefH\x00\x12\x10\n\x08key_text\x18\n \x01(\t\x1a\x39\n\x08JobIdRef\x12\x15\n\rsink_job_name\x18\x01 \x01(\t\x12\x16\n\x0e\x63urrent_job_id\x18\x02 \x01(\x05\x42\x07\n\x05jobId\"K\n\x07JobKeys\x12\x15\n\rnext_job_name\x18\x01 \x01(\t\x12\x11\n\tkey_texts\x18\x02 \x03(\t\x12\x16\n\x0e\x63urrent_job_id\x18\x03 \x01(\x05\"#\n\x08TaskItem\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x0b\n\x03key\x18\x02 \x01(\t\"9\n\x07\x43ommand\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x14\n\x0c\x63ommand_text\x18\x03 \x01(\t\"1\n\x0b\x43ommandList\x12\"\n\x07\x63ommand\x18\x01 \x03(\x0b\x32\x11.scalebox.Command\"\x88\x03\n\x0fTaskExecMessage\x12\x0c\n\x04slot\x18\x01 \x01(\x05\x12\x13\n\x0bstatus_code\x18\x02 \x01(\x05\x12\x0f\n\x07task_id\x18\x03 \x01(\x03\x12\x13\n\x0binput_bytes\x18\x04 \x01(\x03\x12\x14\n\x0coutput_bytes\x18\x05 \x01(\x03\x12&\n\x02t0\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12&\n\x02t1\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12&\n\x02t2\x18\x08 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12&\n\x02t3\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12&\n\x02t4\x18\n \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12,\n\x08time_arr\x18\x0b \x03(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0f\n\x07sys_out\x18\x0c \x01(\t\x12\x0f\n\x07\x61pp_out\x18\r \x01(\t2\xcd\x07\n\x0e\x43ontrolService\x12>\n\x0bGetNextTask\x12\x1b.google.protobuf.Int32Value\x1a\x12.scalebox.TaskItem\x12J\n\x0eSetTaskStarted\x12\x1b.google.protobuf.Int64Value\x1a\x1b.google.protobuf.Int32Value\x12I\n\x0fSetTaskFinished\x12\x19.scalebox.TaskExecMessage\x1a\x1b.google.protobuf.Int32Value\x12M\n\x11SetSlotTerminated\x12\x1b.google.protobuf.Int32Value\x1a\x1b.google.protobuf.Int32Value\x12\x44\n\x13GetRunnableSlotList\x12\x16.google.protobuf.Empty\x1a\x15.scalebox.CommandList\x12N\n\x12SetSlotInitialized\x12\x1b.google.protobuf.Int32Value\x1a\x1b.google.protobuf.Int32Value\x12\x46\n\x15GetTerminableSlotList\x12\x16.google.protobuf.Empty\x1a\x15.scalebox.CommandList\x12\x46\n\x15GetRunnableWorkerList\x12\x16.google.protobuf.Empty\x1a\x15.scalebox.CommandList\x12P\n\x14SetWorkerInitialized\x12\x1b.google.protobuf.Int32Value\x1a\x1b.google.protobuf.Int32Value\x12H\n\x17GetTerminableWorkerList\x12\x16.google.protobuf.Empty\x1a\x15.scalebox.CommandList\x12O\n\x13SetWorkerTerminated\x12\x1b.google.protobuf.Int32Value\x1a\x1b.google.protobuf.Int32Value\x12?\n\x0eSendJobMessage\x12\x10.scalebox.JobKey\x1a\x1b.google.protobuf.Int32Value\x12\x41\n\x0fSendJobMessages\x12\x11.scalebox.JobKeys\x1a\x1b.google.protobuf.Int32ValueB!\n\x10\x63n.scalebox.gprcP\x01Z\x0b./;scaleboxb\x06proto3')
_JOBKEY = DESCRIPTOR.message_types_by_name['JobKey']
_JOBKEY_JOBIDREF = _JOBKEY.nested_types_by_name['JobIdRef']
_JOBKEYS = DESCRIPTOR.message_types_by_name['JobKeys']
_TASKITEM = DESCRIPTOR.message_types_by_name['TaskItem']
_COMMAND = DESCRIPTOR.message_types_by_name['Command']
_COMMANDLIST = DESCRIPTOR.message_types_by_name['CommandList']
_TASKEXECMESSAGE = DESCRIPTOR.message_types_by_name['TaskExecMessage']
JobKey = _reflection.GeneratedProtocolMessageType('JobKey', (_message.Message,), {
'JobIdRef' : _reflection.GeneratedProtocolMessageType('JobIdRef', (_message.Message,), {
'DESCRIPTOR' : _JOBKEY_JOBIDREF,
'__module__' : 'control_pb2'
# @@protoc_insertion_point(class_scope:scalebox.JobKey.JobIdRef)
})
,
'DESCRIPTOR' : _JOBKEY,
'__module__' : 'control_pb2'
# @@protoc_insertion_point(class_scope:scalebox.JobKey)
})
_sym_db.RegisterMessage(JobKey)
_sym_db.RegisterMessage(JobKey.JobIdRef)
JobKeys = _reflection.GeneratedProtocolMessageType('JobKeys', (_message.Message,), {
'DESCRIPTOR' : _JOBKEYS,
'__module__' : 'control_pb2'
# @@protoc_insertion_point(class_scope:scalebox.JobKeys)
})
_sym_db.RegisterMessage(JobKeys)
TaskItem = _reflection.GeneratedProtocolMessageType('TaskItem', (_message.Message,), {
'DESCRIPTOR' : _TASKITEM,
'__module__' : 'control_pb2'
# @@protoc_insertion_point(class_scope:scalebox.TaskItem)
})
_sym_db.RegisterMessage(TaskItem)
Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message,), {
'DESCRIPTOR' : _COMMAND,
'__module__' : 'control_pb2'
# @@protoc_insertion_point(class_scope:scalebox.Command)
})
_sym_db.RegisterMessage(Command)
CommandList = _reflection.GeneratedProtocolMessageType('CommandList', (_message.Message,), {
'DESCRIPTOR' : _COMMANDLIST,
'__module__' : 'control_pb2'
# @@protoc_insertion_point(class_scope:scalebox.CommandList)
})
_sym_db.RegisterMessage(CommandList)
TaskExecMessage = _reflection.GeneratedProtocolMessageType('TaskExecMessage', (_message.Message,), {
'DESCRIPTOR' : _TASKEXECMESSAGE,
'__module__' : 'control_pb2'
# @@protoc_insertion_point(class_scope:scalebox.TaskExecMessage)
})
_sym_db.RegisterMessage(TaskExecMessage)
_CONTROLSERVICE = DESCRIPTOR.services_by_name['ControlService']
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\020cn.scalebox.gprcP\001Z\013./;scalebox'
_JOBKEY._serialized_start=122
_JOBKEY._serialized_end=297
_JOBKEY_JOBIDREF._serialized_start=231
_JOBKEY_JOBIDREF._serialized_end=288
_JOBKEYS._serialized_start=299
_JOBKEYS._serialized_end=374
_TASKITEM._serialized_start=376
_TASKITEM._serialized_end=411
_COMMAND._serialized_start=413
_COMMAND._serialized_end=470
_COMMANDLIST._serialized_start=472
_COMMANDLIST._serialized_end=521
_TASKEXECMESSAGE._serialized_start=524
_TASKEXECMESSAGE._serialized_end=916
_CONTROLSERVICE._serialized_start=919
_CONTROLSERVICE._serialized_end=1892
# @@protoc_insertion_point(module_scope)
This diff is collapsed.
echo "CSST-PYSCALEBOX Installer"
echo "=============================================="
version=""
user=""
pip uninstall pyscalebox -y
echo "Installing CSST PYSCALEBOX API with Version:$version"
echo "➡==============================================="
pip install$user git+http://hub.cstcloud.cn/git/csst/pyscalebox.git$version
\ No newline at end of file
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"
\ No newline at end of file
import grpc
import control_pb2_grpc
import control_pb2
import os
class PyScaleboxApi():
def __init__(self):
self.body = ""
self.a = 0
#2级流水线接口,body:brickid,name1,name2,name3
def send_message_L2(self,body):
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey()
test.cross_app_job_id = int(os.getenv('CSST_ADML2_JOB_ID'))
test.key_text = body
reflag = stub.SendJobMessage(test)
print(reflag.value)
return reflag.value
#1级流水线接口,body:obsid或曝光id,moduleid
def send_message_L1(self,body):
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey()
test.cross_app_job_id = int(os.getenv('CSST_ADML1_JOB_ID'))
print("test.cross_app_job_id = "+os.getenv('CSST_ADML1_JOB_ID'))
test.key_text = body
self.body = body
reflag = stub.SendJobMessage(test)
print(reflag.value)
return reflag.value
#!/bin/bash
python3 pyscalebox.py
\ No newline at end of file
[metadata]
name = pyscalebox
from setuptools import setup,find_packages
setup(name='pyscalebox',
version='0.0.3',
description='level 2 pipeline api:send_message_L2(body)',
author='zhangxiaoli',
author_email='zhangxiaoli@cnic.cn',
requires=['os','grpc'], # 定义依赖哪些模块
py_modules=['pyscalebox','control_pb2','control_pb2_grpc'], # 系统自动从当前目录开始找包
# packages=find_packages(),
license="apache 3.0"
)
\ No newline at end of file
import os
os.environ['CSST_PIPELINE_GRPC_SERVER']="10.255.2.11:50051"
os.environ['CSST_ADML2_JOB_ID']="2"
from pyscalebox import PyScaleboxApi
body="brickid,name1csfszxc"
PyScaleboxApi().send_message_L2(body)
# msg = "hello world"
# print(msg)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment