from typing import List, Union
import logging
import grpc
import time
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.node_container_config import NodeContainerConfig
from csle_common.dao.emulation_config.five_g_du_managers_info import FiveGDUManagersInfo
import csle_common.constants.constants as constants
import csle_collector.constants.constants as csle_collector_constants
import csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc
import csle_collector.five_g_du_manager.five_g_du_manager_pb2
import csle_collector.five_g_du_manager.five_g_du_manager_util
import csle_collector.five_g_du_manager.query_five_g_du_manager
from csle_common.util.emulation_util import EmulationUtil
[docs]class FiveGDUController:
"""
Class controlling 5G dus running on nodes in the emulations, as well as 5G DU managers
"""
[docs] @staticmethod
def start_five_g_du_managers(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility method for starting 5G DU managers
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
FiveGDUController.start_five_g_du_manager(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_five_g_du_manager(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
Utility method for starting the 5G DU manager on a specific container
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
# Check if the manager is already running
status = None
not_running = False
try:
status = FiveGDUController.get_five_g_du_status_by_ip_and_port(
ip=ip, port=emulation_env_config.five_g_config.five_g_du_manager_port, timeout=5)
except Exception:
not_running = True
status_str = ""
if status is None:
not_running = True
else:
status_str = f"du_running: {status.du_running}, ip: {status.ip}"
if not_running:
# Connect
EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip)
# Stop old background job if running
cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL +
constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.FIVE_G_DU_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=emulation_env_config.get_connection(ip=ip))
logger.info(f"Starting 5G DU manager on node {ip}")
# Start the 5G DU manager
cmd = constants.COMMANDS.START_FIVE_G_DU_MANAGER.format(
emulation_env_config.five_g_config.five_g_du_manager_port,
emulation_env_config.five_g_config.five_g_du_manager_log_dir,
emulation_env_config.five_g_config.five_g_du_manager_log_file,
emulation_env_config.five_g_config.five_g_du_manager_max_workers)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip))
time.sleep(2)
else:
logger.info(f"5G du manager was already running on node {ip}. Status: {status_str}")
[docs] @staticmethod
def stop_five_g_du_managers(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility method for stopping 5G DU managers
:param emulation_env_config: the emulation env config
:param physical_server_ip: the IP of the physical host
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
FiveGDUController.stop_five_g_du_manager(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_five_g_du_manager(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
Utility method for stopping a 5G DU manager with a specific IP
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
# Connect
EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip)
logger.info(f"Stopping 5G DU manager on node {ip}")
cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL +
constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.FIVE_G_DU_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip))
time.sleep(2)
[docs] @staticmethod
def get_five_g_du_manager_statuses(
emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) \
-> List[csle_collector.five_g_du_manager.five_g_du_manager_pb2.FiveGDUStatusDTO]:
"""
A method that sends a request to the 5G DU manager on every container to get the status of the
5G du
:param emulation_env_config: the emulation config
:param physical_server_ip: the IP of the physical server
:param logger: the logger to use for logging
:return: List of monitor thread statuses
"""
statuses = []
FiveGDUController.start_five_g_du_managers(emulation_env_config=emulation_env_config,
physical_server_ip=physical_server_ip, logger=logger)
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
status = FiveGDUController.get_five_g_du_status_by_ip_and_port(
port=emulation_env_config.five_g_config.five_g_du_manager_port,
ip=c.docker_gw_bridge_ip)
statuses.append(status)
return statuses
[docs] @staticmethod
def get_five_g_du_managers_ips(emulation_env_config: EmulationEnvConfig) -> List[str]:
"""
A method that extracts the IPs of the 5G DU managers in a given emulation
:param emulation_env_config: the emulation env config
:return: the list of IP addresses
"""
ips = []
for c in emulation_env_config.containers_config.containers:
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
ips.append(c.docker_gw_bridge_ip)
return ips
[docs] @staticmethod
def get_five_g_du_managers_ports(emulation_env_config: EmulationEnvConfig) -> List[int]:
"""
A method that extracts the ports of the FiveGDUManagers in a given emulation
:param emulation_env_config: the emulation env config
:return: the list of ports
"""
ports = []
for c in emulation_env_config.containers_config.containers:
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
ports.append(emulation_env_config.five_g_config.five_g_du_manager_port)
return ports
[docs] @staticmethod
def get_five_g_du_status_by_ip_and_port(
port: int, ip: str, timeout: int = csle_collector_constants.GRPC.TIMEOUT_SECONDS) \
-> csle_collector.five_g_du_manager.five_g_du_manager_pb2.FiveGDUStatusDTO:
"""
A method that sends a request to the 5G DU manager with a specific port and ip
to get the status of the 5G DU
:param port: the port of the FiveGDUManager
:param ip: the ip of the FiveGDUManager
:param timeout: the timeout of the GRPC query
:return: the status of the FiveGDUManager
"""
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
status = \
csle_collector.five_g_du_manager.query_five_g_du_manager.get_five_g_du_status(
stub=stub, timeout=timeout)
return status
[docs] @staticmethod
def start_five_g_dus(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility method for starting the 5G DUs of a specific execution
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
FiveGDUController.start_five_g_du(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_five_g_du(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) \
-> csle_collector.five_g_du_manager.five_g_du_manager_pb2.FiveGDUStatusDTO:
"""
Utility method for starting the 5G DU on a specific container
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
logger.info(f"Starting the 5G DU on container with ip {ip} in execution {emulation_env_config.execution_id} "
f"of emulation: {emulation_env_config.name}")
port = emulation_env_config.five_g_config.five_g_du_manager_port
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
status = csle_collector.five_g_du_manager.query_five_g_du_manager.start_five_g_du(stub=stub)
return status
[docs] @staticmethod
def stop_five_g_dus(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility method for stopping the 5G DUs of a specific execution
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
FiveGDUController.stop_five_g_du(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_five_g_du(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) \
-> csle_collector.five_g_du_manager.five_g_du_manager_pb2.FiveGDUStatusDTO:
"""
Utility method for stopping the 5G DU on a specific container
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
logger.info(f"Stopping the 5G DU on container with ip {ip} in execution {emulation_env_config.execution_id} "
f"of emulation: {emulation_env_config.name}")
port = emulation_env_config.five_g_config.five_g_du_manager_port
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
status = csle_collector.five_g_du_manager.query_five_g_du_manager.stop_five_g_du(stub=stub)
return status
# UE
[docs] @staticmethod
def start_five_g_ues(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility method for starting the 5G UEs of a specific execution
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
FiveGDUController.start_five_g_ue(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_five_g_ue(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) \
-> csle_collector.five_g_du_manager.five_g_du_manager_pb2.FiveGDUStatusDTO:
"""
Utility method for starting the 5G UE on a specific container
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
logger.info(f"Starting the 5G UE on container with ip {ip} in execution {emulation_env_config.execution_id} "
f"of emulation: {emulation_env_config.name}")
port = emulation_env_config.five_g_config.five_g_du_manager_port
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
status = csle_collector.five_g_du_manager.query_five_g_du_manager.start_five_g_ue(stub=stub)
return status
[docs] @staticmethod
def stop_five_g_ues(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility method for stopping the 5G UEs of a specific execution
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
FiveGDUController.stop_five_g_ue(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_five_g_ue(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) \
-> csle_collector.five_g_du_manager.five_g_du_manager_pb2.FiveGDUStatusDTO:
"""
Utility method for stopping the 5G UE on a specific container
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
logger.info(f"Stopping the 5G UE on container with ip {ip} in execution {emulation_env_config.execution_id} "
f"of emulation: {emulation_env_config.name}")
port = emulation_env_config.five_g_config.five_g_du_manager_port
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
status = csle_collector.five_g_du_manager.query_five_g_du_manager.stop_five_g_ue(stub=stub)
return status
[docs] @staticmethod
def init_five_g_dus_ues(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility method for initializing the 5G DUs and UEs of a specific execution
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if ids_image in c.name:
FiveGDUController.init_five_g_du_ue(emulation_env_config=emulation_env_config,
container=c, logger=logger)
[docs] @staticmethod
def init_five_g_du_ue(emulation_env_config: EmulationEnvConfig, container: NodeContainerConfig,
logger: logging.Logger) \
-> Union[csle_collector.five_g_du_manager.five_g_du_manager_pb2.FiveGDUStatusDTO, None]:
"""
Utility method for initializing the 5G UE on a specific container
:param emulation_env_config: the emulation env config
:param container: the container env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
logger.info(
f"Initializing the 5G UE and DU on container with ip {container.docker_gw_bridge_ip} "
f"in execution {emulation_env_config.execution_id} "
f"of emulation: {emulation_env_config.name}")
du_fronthaul_ip = ""
cu_fronthaul_ip = ""
subscriber = None
gnb_du_id = 0
pci = 1
sector_id = 0
for i, du_ip in enumerate(emulation_env_config.five_g_config.du_fronthaul_ips):
if du_ip in container.get_ips():
du_fronthaul_ip = du_ip
cu_fronthaul_ip = emulation_env_config.five_g_config.du_cus[i]
subscriber = emulation_env_config.five_g_config.subscribers[i]
gnb_du_id = i
pci = i + 1
sector_id = i
if du_fronthaul_ip == "" or cu_fronthaul_ip == "" or subscriber is None:
return None
port = emulation_env_config.five_g_config.five_g_du_manager_port
with grpc.insecure_channel(f'{container.docker_gw_bridge_ip}:{port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
status = csle_collector.five_g_du_manager.query_five_g_du_manager.init_five_g_du_ue(
stub=stub, cu_fronthaul_ip=cu_fronthaul_ip, du_fronthaul_ip=du_fronthaul_ip,
imsi=subscriber.imsi, key=subscriber.key, opc=subscriber.opc, imei=subscriber.imei,
gnb_du_id=gnb_du_id, pci=pci, sector_id=sector_id)
return status
[docs] @staticmethod
def get_five_g_du_managers_info(emulation_env_config: EmulationEnvConfig, active_ips: List[str],
logger: logging.Logger, physical_server_ip: str) -> FiveGDUManagersInfo:
"""
Extracts the information of the 5G DU managers for a given emulation
:param emulation_env_config: the configuration of the emulation
:param active_ips: list of active IPs
:param physical_server_ip: the IP of the physical server
:param logger: the logger to use for logging
:return: a DTO with the status of the 5G DU managers
"""
five_g_du_managers_ips = FiveGDUController.get_five_g_du_managers_ips(
emulation_env_config=emulation_env_config)
five_g_du_managers_ports = FiveGDUController.get_five_g_du_managers_ports(
emulation_env_config=emulation_env_config)
five_g_du_managers_statuses = []
five_g_du_managers_running = []
for ip in five_g_du_managers_ips:
if ip not in active_ips or not EmulationUtil.physical_ip_match(
emulation_env_config=emulation_env_config, ip=ip, physical_host_ip=physical_server_ip):
continue
running = False
status = None
try:
status = FiveGDUController.get_five_g_du_status_by_ip_and_port(
port=emulation_env_config.five_g_config.five_g_du_manager_port, ip=ip)
running = True
except Exception as e:
logger.debug(
f"Could not fetch 5G DU manager status on IP:{ip}, error: {str(e)}, {repr(e)}")
if status is not None:
five_g_du_managers_statuses.append(status)
else:
util = csle_collector.five_g_du_manager.five_g_du_manager_util.FiveGDUManagerUtil
five_g_du_managers_statuses.append(util.five_g_du_status_dto_empty())
five_g_du_managers_running.append(running)
execution_id = emulation_env_config.execution_id
emulation_name = emulation_env_config.name
five_g_du_manager_info_dto = FiveGDUManagersInfo(
five_g_du_managers_running=five_g_du_managers_running, ips=five_g_du_managers_ips,
ports=five_g_du_managers_ports, execution_id=execution_id, emulation_name=emulation_name,
five_g_du_managers_statuses=five_g_du_managers_statuses)
return five_g_du_manager_info_dto
[docs] @staticmethod
def start_five_g_du_monitor_threads(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the 5G DUManager on every container
to start the DU manager and the monitor thread
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for container_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if container_image in c.name:
FiveGDUController.start_five_g_du_monitor_thread(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_five_g_du_monitor_thread(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) \
-> None:
"""
A method that sends a request to the 5G FiveGDUManager on a specific IP
to start the 5G DU monitor thread
:param emulation_env_config: the emulation env config
:param ip: IP of the container
:param logger: the logger to use for logging
:return: None
"""
du_status_dto = FiveGDUController.get_five_g_du_status_by_ip_and_port(
ip=ip, port=emulation_env_config.five_g_config.five_g_du_manager_port)
if not du_status_dto.monitor_running:
logger.info(f"5G DU monitor thread is not running on {ip}, starting it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:{emulation_env_config.five_g_config.five_g_du_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
csle_collector.five_g_du_manager.query_five_g_du_manager.start_five_g_du_monitor(
stub=stub, kafka_ip=emulation_env_config.kafka_config.container.get_ips()[0],
kafka_port=emulation_env_config.kafka_config.kafka_port,
time_step_len_seconds=emulation_env_config.kafka_config.time_step_len_seconds)
[docs] @staticmethod
def stop_five_g_du_monitor_threads(emulation_env_config: EmulationEnvConfig, logger: logging.Logger,
physical_host_ip: str) -> None:
"""
A method that sends a request to the 5G DU on every container to stop the monitor threads
:param emulation_env_config: the emulation env config
:param physical_host_ip: the IP of the physical host
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_host_ip:
continue
for container_image in constants.CONTAINER_IMAGES.FIVE_G_DU_IMAGES:
if container_image in c.name:
FiveGDUController.stop_five_g_du_monitor_thread(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_five_g_du_monitor_thread(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) \
-> None:
"""
A method that sends a request to the 5G DU Manager on a specific container to stop the 5G DU monitor thread
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
# Open a gRPC session
with grpc.insecure_channel(f'{ip}:{emulation_env_config.five_g_config.five_g_du_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.five_g_du_manager.five_g_du_manager_pb2_grpc.FiveGDUManagerStub(channel)
logger.info(f"Stopping the 5G DU monitor thread on {ip}.")
csle_collector.five_g_du_manager.query_five_g_du_manager.stop_five_g_du_monitor(stub=stub)