Added test functionality for nftables and connection status.

This commit is contained in:
WickedJack99
2023-12-01 22:31:02 +01:00
parent 52b0fbcfee
commit b3b1c842a3

View File

@@ -1,12 +1,32 @@
# echo-server.py
import socket
import ssl
import nftables
import json
import psutil
def get_network_connections_as_string():
kinds = ['inet', 'inet4', 'inet6', 'tcp', 'tcp4', 'tcp6', 'udp', 'udp4', 'udp6', 'unix', 'all']
network_connections_as_string = ""
network_connections = psutil.net_connections(kind=kinds[0])
for conn in network_connections:
network_connections_as_string += str(conn) + "\n"
return network_connections_as_string
def fetch_nftables_config():
nft = nftables.Nftables()
nft.set_json_output(True)
rc,output,error = nft.cmd("list ruleset")
return output
def start_server():
host = '127.0.0.1'
port = 5000
# Create an SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
ssl_context.load_cert_chain(certfile='server.crt', keyfile='server.key')
# Create a socket and bind it to the specified address and port
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
@@ -19,18 +39,26 @@ def start_server():
client_socket, client_address = server_socket.accept()
print(f"Accepted connection from {client_address}")
# Wrap the client socket with SSL
ssl_socket = ssl.wrap_socket(client_socket, server_side=True, certfile='server.crt', keyfile='server.key', ssl_version=ssl.PROTOCOL_TLS)
# Wrap the client socket with SSL using the SSL context
ssl_socket = ssl_context.wrap_socket(client_socket, server_side=True)
# Read data from the client
data = ssl_socket.recv(1024).decode('utf-8')
print(f"Received from client: {data}")
try:
# Read data from the client
data = ssl_socket.recv(1024).decode('utf-8')
print(f"Received from client: {data}")
# Send a response to the client
ssl_socket.send("Hello, Client!".encode('utf-8'))
# Create output string to send to client
output = fetch_nftables_config()
stringToSend = output
network_connections = get_network_connections_as_string()
stringToSend += network_connections
#print("Data sent to client:\n" + stringToSend)
# Send a response to the client
ssl_socket.send(stringToSend.encode('utf-8'))
# Close the connection
ssl_socket.close()
finally:
# Close the connection
ssl_socket.close()
if __name__ == "__main__":
start_server()