Source code for gs_interactive.client.utils
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from enum import Enum
[docs]
class Encoder:
"""
A simple encoder to encode the data into bytes
"""
[docs]
def __init__(self, endian="little") -> None:
self.byte_array = bytearray()
self.endian = endian
[docs]
def put_int(self, value: int):
"""
Put an integer into the byte array, 4 bytes
"""
self.byte_array.extend(value.to_bytes(4, byteorder=self.endian))
[docs]
def put_long(self, value: int):
"""
Put a long integer into the byte array, 8 bytes
"""
self.byte_array.extend(value.to_bytes(8, byteorder=self.endian))
[docs]
def put_string(self, value: str):
"""
Put a string into the byte array, first put the length of the string, then the string
"""
self.put_int(len(value))
self.byte_array.extend(value.encode("utf-8"))
[docs]
def put_byte(self, value: int):
"""
Put a single byte into the byte array
"""
self.byte_array.extend(value.to_bytes(1, byteorder=self.endian))
[docs]
def put_bytes(self, value: bytes):
"""
Put a byte array into the byte array
"""
self.byte_array.extend(value)
[docs]
def put_double(self, value: float):
"""
Put a double into the byte array, 8 bytes
"""
self.byte_array.extend(value.to_bytes(8, byteorder=self.endian))
[docs]
def get_bytes(self):
"""
Get the bytes from the byte array
"""
# return bytes not bytearray
return bytes(self.byte_array)
[docs]
class Decoder:
"""
A simple decoder to decode the bytes into data
"""
[docs]
def __init__(self, byte_array: bytearray, endian="little") -> None:
self.byte_array = byte_array
self.index = 0
self.endian = endian
[docs]
def get_int(self) -> int:
"""
Get an integer from the byte array, 4 bytes
returns: int
"""
value = int.from_bytes(
self.byte_array[self.index : self.index + 4], # noqa E203
byteorder=self.endian,
)
self.index += 4
return value
[docs]
def get_long(self) -> int:
"""
Get a long integer from the byte array, 8 bytes
returns: int
"""
value = int.from_bytes(
self.byte_array[self.index : self.index + 8], # noqa E203
byteorder=self.endian,
)
self.index += 8
return value
[docs]
def get_double(self) -> float:
"""
Get a double from the byte array, 8 bytes
returns: float
"""
value = float.from_bytes(
self.byte_array[self.index : self.index + 8], # noqa E203
byteorder=self.endian,
)
self.index += 8
return value
[docs]
def get_byte(self) -> int:
"""
Get a single byte from the byte array
returns: int
"""
value = int.from_bytes(
self.byte_array[self.index : self.index + 1], # noqa E203
byteorder=self.endian,
)
self.index += 1
return value
[docs]
def get_bytes(self, length: int) -> bytes:
"""
Get a byte array from the byte array
returns: A byte array
"""
value = self.byte_array[self.index : self.index + length] # noqa E203
self.index += length
return value
[docs]
def get_string(self) -> str:
"""
Get a string from the byte array, first get the length of the string, then the string
returns: str
"""
length = self.get_int()
value = self.byte_array[self.index : self.index + length].decode( # noqa E203
"utf-8"
)
self.index += length
return value
[docs]
def is_empty(self) -> bool:
"""
Whether the byte array is empty
"""
return self.index == len(self.byte_array)
class InputFormat(Enum):
CPP_ENCODER = 0 # raw bytes encoded by encoder/decoder
CYPHER_JSON = 1 # json format string
CYPHER_PROTO_ADHOC = 2 # protobuf adhoc bytes
CYPHER_PROTO_PROCEDURE = 3 # protobuf procedure bytes
def append_format_byte(input: bytes, input_format: InputFormat):
"""
Append a byte to the end of the input string to denote the input format
"""
new_bytes = input + bytes([input_format.value])
return new_bytes