commit 427011901b3bdfe2ecc4efdfaf35f3aa65b64f49 Author: RinRi Date: Sat May 27 13:35:02 2023 +0300 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3de6175 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +lec diff --git a/final/Final Exam - Practical Part.html b/final/Final Exam - Practical Part.html new file mode 100644 index 0000000..c5c0eae --- /dev/null +++ b/final/Final Exam - Practical Part.html @@ -0,0 +1,326 @@ + + + + + + + + + + + + + Final Exam - Practical Part - HackMD + + + + + + + + + + + + + + + + + +

Final Exam - Practical Part

+

Distributed Systems and Network Programming - Spring 2023

+

Task 1 - UDP Socket Programming [8.5 Points]

Write a UDP server and a client using Python socket module to implement a minimalistic DNS service

Client (client.py)

Server (server.py)

Example Run

$ python3 server.py
+Server: listening on 0.0.0.0:50000
+Client: {'type': 'A', 'key': 'example.com'}
+Server: Record found. Sending answer.
+Client: {'type': 'PTR', 'key': '1.2.3.4'}
+Server: Record found. Sending answer.
+Client: {'type': 'CNAME', 'key': 'moodle.com'}
+Server: Record not found. Sending error.
+^CServer: Shutting down...
+
$ python3 client.py
+Client: Sending query for example.com
+Server: {'type': 'A', 'key': 'example.com', 'value': '1.2.3.4'}
+Client: Sending query for 1.2.3.4
+Server: {'type': 'PTR', 'key': '1.2.3.4', 'value': 'example.com'}
+Client: Sending query for moodle.com
+Server: {'type': 'CNAME', 'key': 'moodle.com', 'value': 'NXDOMAIN'}
+

Task 2 - Remote Procedure Call [8.5 Points]

Write a server and a client to implement a simple remote calculator

Server (server.py)

Client (client.py)

Schema file (calculator.proto)

Example Run

$ python3 server.py
+gRPC server is listening on 0.0.0.0:50000
+Add(10,2)
+Subtract(10,2)
+Multiply(10,2)
+Divide(10,2)
+Divide(10,0)
+^CShutting down...
+
$ python3 client.py
+Add(10,2) = 12.0
+Subtract(10,2) = 8.0
+Multiply(10,2) = 20.0
+Divide(10,2) = 5.0
+Divide(10,0) = nan
+
+ + + + + + + + + diff --git a/final/reference/client.py b/final/reference/client.py new file mode 100644 index 0000000..d8cc4c0 --- /dev/null +++ b/final/reference/client.py @@ -0,0 +1,22 @@ +from xmlrpc.client import ServerProxy +import socket +import time + +PORT = 1234 +CLUSTER = [1, 2, 3] +LOGS = ['SET 5', 'ADD 1'] + +if __name__ == '__main__': + time.sleep(10) # Wait for leader election processs + print('Client started') + for node_id in CLUSTER: + try: + with ServerProxy(f'http://node_{node_id}:{PORT}') as node: + if node.is_leader(): + print(f"Node {node_id} is the cluster leader. Sending logs") + for log in LOGS: + if node.leader_receive_log(log): + print(f"Leader committed '{log}'") + time.sleep(5) # Wait for entries to propagate + except socket.error as e: + print(f"Failed to connect to node_{node_id}: {e}") diff --git a/final/reference/node_AmirlanSharipov.py b/final/reference/node_AmirlanSharipov.py new file mode 100644 index 0000000..87a1d5d --- /dev/null +++ b/final/reference/node_AmirlanSharipov.py @@ -0,0 +1,205 @@ +import random +import sched +import socket +import time +from threading import Thread +from argparse import ArgumentParser +from enum import Enum +from xmlrpc.client import ServerProxy +from xmlrpc.server import SimpleXMLRPCServer + +PORT = 1234 +CLUSTER = [1, 2, 3] +ELECTION_TIMEOUT = (6, 8) +HEARTBEAT_INTERVAL = 5 + + +class NodeState(Enum): + """Enumerates the three possible node states (follower, candidate, or leader)""" + FOLLOWER = 1 + CANDIDATE = 2 + LEADER = 3 + + +class Node: + def __init__(self, node_id): + """Non-blocking procedure to initialize all node parameters and start the first election timer""" + self.node_id = node_id + self.state = NodeState.FOLLOWER + self.term = 0 + self.votes = {} + self.log = [] + self.pending_entry = '' + self.sched = sched.scheduler() + self.event = '' + # TODO: start election timer for this node + self.reset_election_timer() + print(f"Node started! State: {self.state}. Term: {self.term}") + + def is_leader(self): + """Returns True if this node is the elected cluster leader and False otherwise""" + if self.state == NodeState.LEADER: + return True + return False + + def reset_election_timer(self): + """Resets election timer for this (follower or candidate) node and returns it to the follower state""" + self.state = NodeState.FOLLOWER + + q = self.sched.queue + for event in q: + self.sched.cancel(event) + + #if (self.node_id == 1 or self.node_id == 3): + # self.sched.enter(0, 1, self.hold_election, ()) + # return + self.sched.enter(random.uniform(ELECTION_TIMEOUT[0], ELECTION_TIMEOUT[1]), 1, self.hold_election, ()) + + def reset_heartbeat_timer(self): + q = self.sched.queue + for event in q: + self.sched.cancel(event) + + self.sched.enter(HEARTBEAT_INTERVAL, 1, self.append_entries, ()) + + def hold_election(self): + """Called when this follower node is done waiting for a message from a leader (election timeout) + The node increments term number, becomes a candidate and votes for itself. + Then call request_vote over RPC for all other online nodes and collects their votes. + If the node gets the majority of votes, it becomes a leader and starts the hearbeat timer + If the node loses the election, it returns to the follower state and resets election timer. + """ + self.term = self.term + 1 + self.state = NodeState.CANDIDATE + self.votes = {} + self.votes[self.node_id] = True + print(f'New election term {self.term}. State: {self.state}') + + for n0 in CLUSTER: + if node == self.node_id: + continue + + try: + print(f'Requesting vote from node {n0}') + with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: + if proxy.request_vote(self.term, self.node_id): + self.votes[n0] = True + else: + self.votes[n0] = False + except Exception as e: + print(f"couldn't request_vote from {n0}") + print(traceback.format_exc()) + print(e) + + if sum(self.votes.values()) > len(CLUSTER) / 2: + self.state = NodeState.LEADER + self.reset_heartbeat_timer() + + print(f"New election term {self.term}. State: {self.state}") + + def request_vote(self, term, candidate_id): + """Called remotely when a node requests voting from other nodes. + Updates the term number if the received one is greater than `self.term` + A node rejects the vote request if it's a leader or it already voted in this term. + Returns True and update `self.votes` if the vote is granted to the requester candidate and False otherwise. + """ + + print(f"Got a vote request from {candidate_id} (term={term})") + self.reset_election_timer() + + if term > self.term: + self.term = term + self.votes = {} + + if self.is_leader() or len(self.votes) > 0: + return False + + self.votes[candidate_id] = True + return True + + def append_entries(self): + """Called by leader every HEARTBEAT_INTERVAL, sends a heartbeat message over RPC to all online followers. + Accumulates ACKs from followers for a pending log entry (if any) + If the majority of followers ACKed the entry, the entry is committed to the log and is no longer pending + """ + print("Sending a heartbeat to followers") + + acks = 0 + for n0 in CLUSTER: + if n0 == self.node_id: + continue + + try: + with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: + if proxy.heartbeat(self.pending_entry): + acks = acks + 1 + except Exception as e: + print(f"couldn't heartbeat {n0}") + print(traceback.format_exc()) + print(e) + + if self.pending_entry != '' and acks > len(CLUSTER) / 2: + self.log.append(self.pending_entry) + print(f'Leader commited \'{self.pending_entry}\'') + self.pending_entry = '' + + self.reset_heartbeat_timer() + + def heartbeat(self, leader_entry): + """Called remotely from the leader to inform followers that it's alive and supply any pending log entry + Followers would commit an entry if it was pending before, but is no longer now. + Returns True to ACK the heartbeat and False on any problems. + """ + print(f"Heartbeat received from leader (entry='{leader_entry}')") + try: + self.reset_election_timer() + if self.pending_entry != '' and leader_entry != self.pending_entry: + self.log.append(self.pending_entry) + print(f'Follower commited \'{self.pending_entry}\'') + + self.pending_entry = leader_entry + + return True + except Exception as e: + return False + + def leader_receive_log(self, log): + """Called remotely from the client. Executed only by the leader upon receiving a new log entry + Returns True after the entry is committed to the leader log and False on any problems + """ + print(f"Leader received log \'{log}\' from client") + while self.pending_entry != '': + time.sleep(1) + + self.pending_entry = log + time.sleep(7) + if self.pending_entry == '' and self.log[-1] == log: + return True + return False + + +if __name__ == '__main__': + # TODO: Parse one integer argument (node_id), then create the node with that ID. + # TODO: Start RPC server on 0.0.0.0:PORT and expose the node instance + # TODO: Run the node scheduler in an isolated thread. + # TODO: Handle KeyboardInterrupt and terminate gracefully. + try: + parser = ArgumentParser() + parser.add_argument('node_id') + args = parser.parse_args() + node = Node(int(args.node_id)) + + t = Thread(target=node.sched.run) + t.start() + + server = SimpleXMLRPCServer(('0.0.0.0', PORT), logRequests=False) + print(f"Listening on port {PORT}...") + server.register_function(node.leader_receive_log, "leader_receive_log") + server.register_function(node.heartbeat, "heartbeat") + server.register_function(node.request_vote, "request_vote") + server.register_function(node.is_leader, "is_leader") + server.serve_forever() + except KeyboardInterrupt: + print("node killed...") + exit() + diff --git a/final/task1/client.py b/final/task1/client.py new file mode 100644 index 0000000..1772a85 --- /dev/null +++ b/final/task1/client.py @@ -0,0 +1,34 @@ +import socket +import json + +MSS = 20476 # MSS = Server buffer size (20480) - data header size (4) + +class Query: + type = '' + key = '' + + def __init__(self, type, key): + self.type = type + self.key = key + + def json_str(self): + return json.dumps(self.__dict__) + +def await_response(s): + data, addr = s.recvfrom(MSS) + print(f'Server: {json.loads(data)}') + +if __name__ == "__main__": + server_ip, server_port = ('127.0.0.1', 50000) + + queries = (Query('A', 'example.com'), + Query('PTR', '1.2.3.4'), + Query('CNAME', 'moodle.com')) + + with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: + for query in queries: + print(f'Performing query {query.json_str()}:') + s.sendto(query.json_str().encode(), (server_ip, server_port)) + + await_response(s) + diff --git a/final/task1/server.py b/final/task1/server.py new file mode 100644 index 0000000..f8b516a --- /dev/null +++ b/final/task1/server.py @@ -0,0 +1,51 @@ +import socket +import json + +IP_ADDR = "0.0.0.0" +PORT = 50000 +MSS = 24000 # MSS = Server buffer size (20480) - data header size (4) + +class RR: + type = '' + key = '' + value = '' + + def __init__(self, type, key, value): + self.type = type + self.key = key + self.value = value + + def json_str(self): + return json.dumps(self.__dict__) + +if __name__ == "__main__": + server_port = PORT + + records = (RR('A', 'example.com', '1.2.3.4'), RR('PTR', '1.2.3.4', 'example.com')) + + with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: + s.bind((IP_ADDR, server_port)) + print(f'Listening on {IP_ADDR}:{server_port}...') + + try: + while True: + data, addr = s.recvfrom(MSS) + query = json.loads(data) + print(f'Client {query}') + ok = False + + for record in records: + if record.type == query['type'] and record.key == query['key']: + print('Server: Record found. Sending answer.') + s.sendto(record.json_str().encode(), addr) + ok = True + break + + if ok == False: + print('Server: Record not found. Sending error.') + record = RR(query['type'], query['key'], 'NXDOMAIN') + s.sendto(record.json_str().encode(), addr) + + except KeyboardInterrupt: + print('Server: Interrupted by user. Exiting') + exit(0) diff --git a/final/task2/AmirlanSharipov/.gitignore b/final/task2/AmirlanSharipov/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/final/task2/AmirlanSharipov/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/final/task2/AmirlanSharipov/calculator.proto b/final/task2/AmirlanSharipov/calculator.proto new file mode 100644 index 0000000..a0dcbc4 --- /dev/null +++ b/final/task2/AmirlanSharipov/calculator.proto @@ -0,0 +1,17 @@ +syntax = 'proto3'; + +service Calculator { + rpc Add(Request) returns (FloatResponse); + rpc Substract(Request) returns (FloatResponse); + rpc Multiply(Request) returns (FloatResponse); + rpc Divide(Request) returns (FloatResponse); +} + +message Request { + int32 a = 1; + int32 b = 2; +} + +message FloatResponse { + float ans = 1; +} diff --git a/final/task2/AmirlanSharipov/calculator_pb2.py b/final/task2/AmirlanSharipov/calculator_pb2.py new file mode 100644 index 0000000..ad7bffa --- /dev/null +++ b/final/task2/AmirlanSharipov/calculator_pb2.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: calculator.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x63\x61lculator.proto\"\x1f\n\x07Request\x12\t\n\x01\x61\x18\x01 \x01(\x05\x12\t\n\x01\x62\x18\x02 \x01(\x05\"\x1c\n\rFloatResponse\x12\x0b\n\x03\x61ns\x18\x01 \x01(\x02\x32\x9e\x01\n\nCalculator\x12\x1f\n\x03\x41\x64\x64\x12\x08.Request\x1a\x0e.FloatResponse\x12%\n\tSubstract\x12\x08.Request\x1a\x0e.FloatResponse\x12$\n\x08Multiply\x12\x08.Request\x1a\x0e.FloatResponse\x12\"\n\x06\x44ivide\x12\x08.Request\x1a\x0e.FloatResponseb\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'calculator_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _REQUEST._serialized_start=20 + _REQUEST._serialized_end=51 + _FLOATRESPONSE._serialized_start=53 + _FLOATRESPONSE._serialized_end=81 + _CALCULATOR._serialized_start=84 + _CALCULATOR._serialized_end=242 +# @@protoc_insertion_point(module_scope) diff --git a/final/task2/AmirlanSharipov/calculator_pb2_grpc.py b/final/task2/AmirlanSharipov/calculator_pb2_grpc.py new file mode 100644 index 0000000..31a05fe --- /dev/null +++ b/final/task2/AmirlanSharipov/calculator_pb2_grpc.py @@ -0,0 +1,165 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import calculator_pb2 as calculator__pb2 + + +class CalculatorStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Add = channel.unary_unary( + '/Calculator/Add', + request_serializer=calculator__pb2.Request.SerializeToString, + response_deserializer=calculator__pb2.FloatResponse.FromString, + ) + self.Substract = channel.unary_unary( + '/Calculator/Substract', + request_serializer=calculator__pb2.Request.SerializeToString, + response_deserializer=calculator__pb2.FloatResponse.FromString, + ) + self.Multiply = channel.unary_unary( + '/Calculator/Multiply', + request_serializer=calculator__pb2.Request.SerializeToString, + response_deserializer=calculator__pb2.FloatResponse.FromString, + ) + self.Divide = channel.unary_unary( + '/Calculator/Divide', + request_serializer=calculator__pb2.Request.SerializeToString, + response_deserializer=calculator__pb2.FloatResponse.FromString, + ) + + +class CalculatorServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Add(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Substract(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Multiply(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Divide(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_CalculatorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Add': grpc.unary_unary_rpc_method_handler( + servicer.Add, + request_deserializer=calculator__pb2.Request.FromString, + response_serializer=calculator__pb2.FloatResponse.SerializeToString, + ), + 'Substract': grpc.unary_unary_rpc_method_handler( + servicer.Substract, + request_deserializer=calculator__pb2.Request.FromString, + response_serializer=calculator__pb2.FloatResponse.SerializeToString, + ), + 'Multiply': grpc.unary_unary_rpc_method_handler( + servicer.Multiply, + request_deserializer=calculator__pb2.Request.FromString, + response_serializer=calculator__pb2.FloatResponse.SerializeToString, + ), + 'Divide': grpc.unary_unary_rpc_method_handler( + servicer.Divide, + request_deserializer=calculator__pb2.Request.FromString, + response_serializer=calculator__pb2.FloatResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'Calculator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class Calculator(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Add(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Calculator/Add', + calculator__pb2.Request.SerializeToString, + calculator__pb2.FloatResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def Substract(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Calculator/Substract', + calculator__pb2.Request.SerializeToString, + calculator__pb2.FloatResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def Multiply(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Calculator/Multiply', + calculator__pb2.Request.SerializeToString, + calculator__pb2.FloatResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def Divide(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Calculator/Divide', + calculator__pb2.Request.SerializeToString, + calculator__pb2.FloatResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/final/task2/AmirlanSharipov/client.py b/final/task2/AmirlanSharipov/client.py new file mode 100644 index 0000000..a5134ed --- /dev/null +++ b/final/task2/AmirlanSharipov/client.py @@ -0,0 +1,40 @@ +import grpc +import calculator_pb2 as service +import calculator_pb2_grpc as stub + +import random + + +def Add(a, b): + args = service.Request(a=a, b=b) + response = stub.Add(args) + print(f"{a} + {b} = {response.ans}") + + +def Substract(a, b): + args = service.Request(a=a, b=b) + response = stub.Substract(args) + print(f"{a} - {b} = {response.ans}") + + +def Multiply(a, b): + args = service.Request(a=a, b=b) + response = stub.Multiply(args) + print(f"{a} * {b} = {response.ans}") + + +def Divide(a, b): + args = service.Request(a=a, b=b) + response = stub.Divide(args) + print(f"{a} / {b} = {response.ans}") + + +if __name__ == '__main__': + with grpc.insecure_channel('localhost:1234') as channel: + stub = stub.CalculatorStub(channel) + + Add(10, 2) + Substract(10, 2) + Multiply(10, 2) + Divide(10, 2) + Divide(10, 0) diff --git a/final/task2/AmirlanSharipov/server.py b/final/task2/AmirlanSharipov/server.py new file mode 100644 index 0000000..8441db2 --- /dev/null +++ b/final/task2/AmirlanSharipov/server.py @@ -0,0 +1,49 @@ +from concurrent.futures import ThreadPoolExecutor + +import grpc +import calculator_pb2 as stub +import calculator_pb2_grpc as service + +import math + +SERVER_ADDR = '0.0.0.0:1234' + +class Calculator(service.CalculatorServicer): + def Add(self, request, context): + a = request.a + b = request.b + print(f'Add({a}, {b})') + return stub.FloatResponse(ans=a+b) + + def Substract(self, request, context): + a = request.a + b = request.b + print(f'Substract({a}, {b})') + return stub.FloatResponse(ans=a-b) + + def Multiply(self, request, context): + a = request.a + b = request.b + print(f'Multiply({a}, {b})') + return stub.FloatResponse(ans=a*b) + + def Divide(self, request, context): + a = request.a + b = request.b + print(f'Divide({a}, {b})') + try: + return stub.FloatResponse(ans=a/b) + except Exception as e: + return stub.FloatResponse(ans=math.nan) + + +if __name__ == '__main__': + try: + server = grpc.server(ThreadPoolExecutor(max_workers=30)) + service.add_CalculatorServicer_to_server(Calculator(), server) + server.add_insecure_port(SERVER_ADDR) + server.start() + print(f'listening on {SERVER_ADDR}') + server.wait_for_termination() + except KeyboardInterrupt: + print('Interrupted by user. Shutting down...') diff --git a/lab1/Lab 01.html b/lab1/Lab 01.html new file mode 100644 index 0000000..98ba093 --- /dev/null +++ b/lab1/Lab 01.html @@ -0,0 +1,335 @@ + + + + + + + + + + + + + Week 1 - Stop-and-Wait ARQ - HackMD + + + + + + + + + + + + + + + + + +

Week 1 - Stop-and-Wait ARQ

+

Distributed Systems and Network Programming - Spring 2023

+

Task

Client Implementation

Server Implementation (your task)

    +
  1. Parse one integer argument, the port number to listen on.
  2. +
  3. Create a UDP socket and start listening for incoming messages on 0.0.0.0:<port>. +
      +
    • The server should use a fixed receiver buffer size of 20480 bytes (20 Kibibytes).
    • +
    +
  4. +
  5. Upon receiving a message from a client, inspect the message type (first character). +
      +
    • If the message type is s, prepare to receive a file from the client with the given name and size.
    • +
    • If the message type is d, write the delivered chunk to the file system.
    • +
    • Otherwise, terminate gracefully with an error.
    • +
    +
  6. +
  7. In both cases, reply with an acknowledge message in the format a|seqno where +
      +
    • a indicates that the message type is acknowledgement.
    • +
    • seqno equals (x+1)%2 where x is the sequence number of the message to be acknowledged.
    • +
    +
  8. +
  9. Once the file is received completely, the server should print an indicating message, write the content to the file system, and close the file.
  10. +
  11. If an existing file with the same name is present in the server directory, the server should print an indicating message and overwrite that file with the new one.
  12. +
  13. Your server will be tested under constant delay and packet loss. The following Linux command can be used to simulate 15% packet loss and 1100 milliseconds constant delay over the lo interface. File transfer should still succeed after applying the command. To undo the effect use del instead of add.
    sudo tc qdisc add dev lo root netem loss 15% delay 1100ms
    +
    +
  14. +
  15. The server stays running unless a fatal error occurs or a KeyboardInterrupt is received.
  16. +

Testing

ClientServers | 0 | note.txt | 2000a | 1d | 1 | chunk1a | 0ClientServer

Checklist

Your submitted code should satisfy the following requirements. Failing to satisfy an item will result in partial grade deduction or an assignment failure (depending on the severity).

+ + + + + + + + + diff --git a/lab1/client/client.py b/lab1/client/client.py new file mode 100644 index 0000000..1da18be --- /dev/null +++ b/lab1/client/client.py @@ -0,0 +1,63 @@ +import argparse +import math +import os +import socket + +MSS = 20476 # MSS = Server buffer size (20480) - data header size (4) + + +def await_ack(packet): + s.settimeout(1) + while True: + try: + data, addr = s.recvfrom(1024) + print(f"Server: {data.decode()}") + received_ack_seqno = int(data[2:3].decode()) + expected_ack_seqno = (int(packet[2:3].decode()) + 1) % 2 + if received_ack_seqno != expected_ack_seqno: + continue + return True + except KeyboardInterrupt: + print("Client: Exiting...") + exit() + except socket.timeout: # Expected ACK was not received within one second + print(f"Client: Retransmitting...") + s.sendto(packet, (server_ip, server_port)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("server_addr", type=str) + parser.add_argument("file_path", type=str) + args = parser.parse_args() + + server_ip, server_port = args.server_addr.split(":") + server_port = int(server_port) + file_name = args.file_path.split(os.path.sep)[-1] + file_size = os.path.getsize(args.file_path) + + with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 30000) + + # Check file existence + if not os.path.exists(args.file_path): + print(f"Client: no such file: {args.file_path}") + exit() + + # Send start packet to server + packet = f"s|0|{file_name}|{file_size}".encode() + print(f"Client: {packet.decode()}") + s.sendto(packet, (server_ip, server_port)) + + # Wait for ACK for the given packet + await_ack(packet) + + # Upload file to server + seqno = 1 + with open(args.file_path, "rb") as f: + for i in range(math.ceil(file_size / MSS)): + print(f"Client: d|{seqno}|chunk{i + 1}") + packet = bytes(f"d|{seqno}|", "utf-8") + f.read(MSS) + s.sendto(packet, (server_ip, server_port)) + await_ack(packet) + seqno = (seqno + 1) % 2 diff --git a/lab1/client/image.png b/lab1/client/image.png new file mode 100644 index 0000000..454aca9 Binary files /dev/null and b/lab1/client/image.png differ diff --git a/lab1/client/note.txt b/lab1/client/note.txt new file mode 100644 index 0000000..1b37687 --- /dev/null +++ b/lab1/client/note.txt @@ -0,0 +1 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. diff --git a/lab1/server/image.png b/lab1/server/image.png new file mode 100644 index 0000000..454aca9 Binary files /dev/null and b/lab1/server/image.png differ diff --git a/lab1/server/note.txt b/lab1/server/note.txt new file mode 100644 index 0000000..1b37687 --- /dev/null +++ b/lab1/server/note.txt @@ -0,0 +1 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. diff --git a/lab1/server/server.py b/lab1/server/server.py new file mode 100644 index 0000000..87e6f59 --- /dev/null +++ b/lab1/server/server.py @@ -0,0 +1,75 @@ +import argparse +import math +import os +import socket + +IP_ADDR = "0.0.0.0" +MSS = 24000 # MSS = Server buffer size (20480) - data header size (4) + + +def await_ack(packet, addr): + s.settimeout(1) + while True: + try: + data, addr = s.recvfrom(MSS) + return (data, addr) + except KeyboardInterrupt: + print("Server: Exiting...") + exit() + except socket.timeout: + print("Server: Retransmitting...") + s.sendto(packet, addr) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("port", type=str) + args = parser.parse_args() + + server_port = args.port + server_port = int(server_port) + + with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 30000) + s.bind((IP_ADDR, server_port)) + print(f'Listening on {IP_ADDR}:{server_port}...') + + data, addr = s.recvfrom(MSS) + columns = data.decode('utf-8', 'replace').split('|') + print(columns) + if os.path.isfile(f'./{columns[2]}'): + print(f'file {columns[2]} exists. Overwriting...') + f = open(columns[2], 'wb') + + if columns[0] == 's': + message = f'a|{(int(columns[1])+1)%2}' + s.sendto(message.encode(), addr) + data, addr = await_ack(message.encode(), addr) + + bytes_received = 0 + last = b'0' + while bytes_received < int(columns[3]): + new_columns = data.split(b'|') + if new_columns[0] != b'd': + data, addr = await_ack(message.encode(), addr) + continue + if new_columns[1] == last: + data, addr = await_ack(message.encode(), addr) + continue + + last = new_columns[1] + + bindata = new_columns[2] + for i in new_columns[3:]: + bindata = bindata + (b'|') + i + f.write(bindata) + bytes_received += len(bindata) + print(f'Received: {bytes_received}/{columns[3]}') + + message = f'a|{(int(new_columns[1])+1)%2}' + s.sendto(message.encode(), addr) + data, addr = await_ack(message.encode(), addr) + + print(f'file {columns[2]} was successfully uploaded. Exiting...') + else: + exit(1) diff --git a/lab2/.gitignore b/lab2/.gitignore new file mode 100644 index 0000000..9e75ce1 --- /dev/null +++ b/lab2/.gitignore @@ -0,0 +1,2 @@ +frames +AmirlanSharipov.gif diff --git a/lab2/AmirlanSharipov_client.py b/lab2/AmirlanSharipov_client.py new file mode 100644 index 0000000..2ef3954 --- /dev/null +++ b/lab2/AmirlanSharipov_client.py @@ -0,0 +1,81 @@ +# BEFORE CLIENT OPT: +# Frames download time: 4.928671836853027 +# GIF creation time: 5.02190637588501 +# AFTER CLIENT OPT: +# Frames download time: 3.885207176208496 +# GIF creation time: 4.356576204299927 + +import os +import socket +import time +import threading +import multiprocessing + +from PIL import Image + +SERVER_URL = '127.0.0.1:1234' +FILE_NAME = 'AmirlanSharipov.gif' +CLIENT_BUFFER = 1024 +FRAME_COUNT = 5000 +MAXTHREADS = 8 +MAXPROCESSES = 8 + +pool_sema = threading.BoundedSemaphore(value=MAXTHREADS) + + +def routine_save_image(i): + with pool_sema: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + ip, port = SERVER_URL.split(':') + s.connect((ip, int(port))) + image = b'' + while True: + packet = s.recv(CLIENT_BUFFER) + if not packet: + break + image += packet + + with open(f'frames/{i}.png', 'wb') as f: + f.write(image) + + +def download_frames(): + t0 = time.time() + if not os.path.exists('frames'): + os.mkdir('frames') + + threads = list() + for i in range(FRAME_COUNT): + t = threading.Thread(target=routine_save_image, args=(i, )) + threads.append(t) + + for t in threads: + t.start() + + for t in threads: + t.join() + + return time.time() - t0 + + +def get_RGBA(fname): + return Image.open(fname).convert('RGBA') + + +def create_gif(): + t0 = time.time() + frame_list = list() + for frame_id in range(FRAME_COUNT): + frame_list.append(f'frames/{frame_id}.png') + + with multiprocessing.Pool(MAXPROCESSES) as p: + frames = p.map(get_RGBA, frame_list) + + frames[0].save(FILE_NAME, format="GIF", + append_images=frames[1:], save_all=True, duration=500, loop=0) + return time.time() - t0 + + +if __name__ == '__main__': + print(f"Frames download time: {download_frames()}") + print(f"GIF creation time: {create_gif()}") diff --git a/lab2/AmirlanSharipov_server.py b/lab2/AmirlanSharipov_server.py new file mode 100644 index 0000000..6914717 --- /dev/null +++ b/lab2/AmirlanSharipov_server.py @@ -0,0 +1,58 @@ +import os +import socket +import time +import random +import threading +import io + +from PIL import Image + +SERVER_URL = '0.0.0.0:1234' +FRAME_COUNT = 5000 +BACKLOG = 100 + + +def routine_send_img(connection): + img = Image.new(mode="RGBA", size=(10, 10)) + pix = img.load() + for i in range(10): + for j in range(10): + r = random.randrange(0, 255) + g = random.randrange(0, 255) + b = random.randrange(0, 255) + pix[i, j] = (r, g, b) + + output = io.BytesIO() + img.save(output, format='PNG') + connection.send(output.getvalue()) + connection.close() + + +def main(): + ip, port = SERVER_URL.split(':') + port = int(port) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind((ip, port)) + + s.listen(BACKLOG) + print(f'Listening on port {port}...') + + threads = list() + while True: + connection, addr = s.accept() + #print(f'connected from {addr}...') + + t = threading.Thread(target=routine_send_img, args=(connection, )) + threads.append(t) + t.start() + print(f'Sending an image to {addr}...') + + for t in threads: + t.join() + + s.close() + + +if __name__ == '__main__': + main() diff --git a/lab2/AmirlanSharipov_timediff.txt b/lab2/AmirlanSharipov_timediff.txt new file mode 100644 index 0000000..1a77292 --- /dev/null +++ b/lab2/AmirlanSharipov_timediff.txt @@ -0,0 +1,7 @@ +BEFORE CLIENT OPT: +Frames download time: 4.928671836853027 +GIF creation time: 5.02190637588501 + +AFTER CLIENT MULTIPROCESSING GIF: +Frames download time: 3.885207176208496 +GIF creation time: 4.356576204299927 diff --git a/lab2/Lab 02.html b/lab2/Lab 02.html new file mode 100644 index 0000000..41c2819 --- /dev/null +++ b/lab2/Lab 02.html @@ -0,0 +1,278 @@ + + + + + + + + + + + + + Week 2 - Concurrency and Parallelism - HackMD + + + + + + + + + + + + + + + + + +

Week 2 - Concurrency and Parallelism

+

Distributed Systems and Network Programming - Spring 2023

+

Task

Your tasks for this lab:

Server Implementation

Client Implementation

The client does the following:

    +
  1. Connect to the TCP server multiple times to download 5000 images, one by one.
  2. +
  3. Download the images to a directory called frames (creating the directory if it does not exist).
  4. +
  5. Create a GIF by combining the downloaded frames.
  6. +
  7. Use time module to calculate the total time taken for frame download and GIF generation.
  8. +

Your task

    +
  1. Once you understand how the client code works, start by writing the server.
  2. +
  3. Once the server works fine. It’s time to optimize the runtime of the client.
  4. +
  5. Use threading to spawn multiple threads that download the required frames concurrently.
  6. +
  7. Use multiprocessing to spawn multiple processes (not more that your CPU core count) to process the frames in parallel to create the GIF faster. You may use multiprocessing.Pool() to achieve the task.
  8. +
  9. Check the time taken in each stage and verify that the client runtime was improved.
  10. +

Example run

$ python3 NameSurname_server.py
+Listening on 0.0.0.0:1234
+Sent an image to (127.0.0.1, 50125)
+Sent an image to (127.0.0.1, 58754)
+...
+
+# Before optimizing client
+$ python3 NameSurname_client.py
+Frames download time: 25.516422748565674
+GIF creation time: 30.278062343597412
+
+# After optimizing client
+$ python3 NameSurname_client.py
+Frames download time: 18.751099348068237
+GIF creation time: 9.695139408111572
+

Checklist

Your submitted code should satisfy the following requirements. Failing to satisfy an item will result in partial grade deduction or an assignment failure (depending on the severity).

+ + + + + + + + + diff --git a/lab2/NameSurname.gif b/lab2/NameSurname.gif new file mode 100644 index 0000000..c75e9d7 Binary files /dev/null and b/lab2/NameSurname.gif differ diff --git a/lab3/AmirlanSharipov/.gitignore b/lab3/AmirlanSharipov/.gitignore new file mode 100644 index 0000000..04c0fd4 --- /dev/null +++ b/lab3/AmirlanSharipov/.gitignore @@ -0,0 +1,2 @@ +venv +rabbitmq diff --git a/lab3/AmirlanSharipov/docker-compose.yml b/lab3/AmirlanSharipov/docker-compose.yml new file mode 100644 index 0000000..e415e4d --- /dev/null +++ b/lab3/AmirlanSharipov/docker-compose.yml @@ -0,0 +1,14 @@ +version: "3.2" +services: + rabbitmq3: + container_name: "rabbitmq" + image: rabbitmq:3.8-management-alpine + environment: + - RABBITMQ_DEFAULT_USER=rabbit + - RABBITMQ_DEFAULT_PASS=1234 + ports: + - '5672:5672' # AMQP protocol port + - '15672:15672' # HTTP management UI + volumes: + - ./rabbitmq/data/:/var/lib/rabbitmq/ + - ./rabbitmq/log/:/var/log/rabbitmq diff --git a/lab3/AmirlanSharipov/publishers/control-tower.py b/lab3/AmirlanSharipov/publishers/control-tower.py new file mode 100644 index 0000000..212f6e2 --- /dev/null +++ b/lab3/AmirlanSharipov/publishers/control-tower.py @@ -0,0 +1,45 @@ +import json + +from pika import BlockingConnection, ConnectionParameters, PlainCredentials +from pika.exchange_type import ExchangeType + +RMQ_HOST = 'localhost' +RMQ_USER = 'rabbit' +RMQ_PASS = '1234' +EXCHANGE_NAME = 'amq.topic' +ROUTING_KEY_CURRENT = 'rep.current' +ROUTING_KEY_AVG = 'rep.average' + + +def main(): + connection = BlockingConnection( + ConnectionParameters( + host=RMQ_HOST, + credentials=PlainCredentials(RMQ_USER, RMQ_PASS) + ) + ) + try: + channel = connection.channel() + result = channel.queue_declare(queue=ROUTING_KEY_CURRENT) + channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) + result = channel.queue_declare(queue=ROUTING_KEY_AVG) + channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) + + while True: + query = input('Enter Query: ') + if query == 'current': + channel.basic_publish(exchange=EXCHANGE_NAME, + routing_key=ROUTING_KEY_CURRENT, + body=query) + if query == 'average': + channel.basic_publish(exchange=EXCHANGE_NAME, + routing_key=ROUTING_KEY_AVG, + body=query) + + except KeyboardInterrupt: + connection.close() + print('Interrupted by user. Shutting down...') + + +if __name__ == '__main__': + main() diff --git a/lab3/AmirlanSharipov/publishers/sensor.py b/lab3/AmirlanSharipov/publishers/sensor.py new file mode 100644 index 0000000..9142523 --- /dev/null +++ b/lab3/AmirlanSharipov/publishers/sensor.py @@ -0,0 +1,40 @@ +import json +from datetime import datetime + +from pika import BlockingConnection, ConnectionParameters, PlainCredentials +from pika.exchange_type import ExchangeType + +RMQ_HOST = 'localhost' +RMQ_USER = 'rabbit' +RMQ_PASS = '1234' +EXCHANGE_NAME = 'amq.topic' +ROUTING_KEY = 'co2.sensor' + + +def main(): + connection = BlockingConnection( + ConnectionParameters( + host=RMQ_HOST, + credentials=PlainCredentials(RMQ_USER, RMQ_PASS) + ) + ) + try: + channel = connection.channel() + result = channel.queue_declare(queue=ROUTING_KEY) + channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) + + while True: + co2 = int(input('Enter CO2 level: ')) + message = json.dumps({'time': str(datetime.utcnow()), 'value': co2}) + print(message) + channel.basic_publish(exchange=EXCHANGE_NAME, + routing_key=ROUTING_KEY, + body=message) + connection.close() + except KeyboardInterrupt: + connection.close() + print('Interrupted by user. Shutting down...') + + +if __name__ == '__main__': + main() diff --git a/lab3/AmirlanSharipov/receiver.log b/lab3/AmirlanSharipov/receiver.log new file mode 100644 index 0000000..3c889ea --- /dev/null +++ b/lab3/AmirlanSharipov/receiver.log @@ -0,0 +1,4 @@ +{"time": "2023-04-13 18:39:29.566244", "value": 123} +{"time": "2023-04-13 18:39:30.650069", "value": 500} +{"time": "2023-04-13 18:39:31.322338", "value": 501} +{"time": "2023-04-13 19:09:42.439397", "value": 1234} diff --git a/lab3/AmirlanSharipov/subscribers/receiver.py b/lab3/AmirlanSharipov/subscribers/receiver.py new file mode 100644 index 0000000..0e56e1d --- /dev/null +++ b/lab3/AmirlanSharipov/subscribers/receiver.py @@ -0,0 +1,53 @@ +import json + +from pika import BlockingConnection, ConnectionParameters, PlainCredentials +from pika.exchange_type import ExchangeType + +RMQ_HOST = 'localhost' +RMQ_USER = 'rabbit' +RMQ_PASS = '1234' +EXCHANGE_NAME = 'amq.topic' +ROUTING_KEY = 'co2.*' + + +def callback(channel, method, properties, body): + log_file = open('receiver.log', 'a') + log_file.write(body.decode() + '\n') + + message = json.loads(body) + status = 'OK' + if message['value'] > 500: + status = 'WARNING' + print(f"{message['time']}: {status}") + + log_file.close() + channel.basic_ack(delivery_tag=method.delivery_tag) + + +def main(): + connection = BlockingConnection( + ConnectionParameters( + host=RMQ_HOST, + credentials=PlainCredentials(RMQ_USER, RMQ_PASS) + ) + ) + try: + channel = connection.channel() + result = channel.queue_declare(queue=ROUTING_KEY) + channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) + + channel.basic_consume(queue=ROUTING_KEY, + on_message_callback=callback) + + print('[*] Waiting for CO2 data. Press CTRL+C to exit') + + channel.start_consuming() + + connection.close() + except KeyboardInterrupt: + connection.close() + print('Interrupted by user. Shutting down...') + + +if __name__ == '__main__': + main() diff --git a/lab3/AmirlanSharipov/subscribers/reporter.py b/lab3/AmirlanSharipov/subscribers/reporter.py new file mode 100644 index 0000000..c19eda6 --- /dev/null +++ b/lab3/AmirlanSharipov/subscribers/reporter.py @@ -0,0 +1,60 @@ +import json +import os +from datetime import datetime + +from pika import BlockingConnection, ConnectionParameters, PlainCredentials +from pika.exchange_type import ExchangeType + +RMQ_HOST = 'localhost' +RMQ_USER = 'rabbit' +RMQ_PASS = '1234' +EXCHANGE_NAME = 'amq.topic' +ROUTING_KEY = 'rep.*' + + +def extract_value(dict_line): + return dict_line['value'] + + +def callback(channel, method, properties, body): + log_file = open('receiver.log', 'r') + lines = log_file.readlines() + dict_lines = list(map(json.loads, lines)) + if body == b'current': + print(f"{datetime.utcnow()}: Latest CO2 level is {dict_lines[-1]['value']}") + else: + values = list(map(extract_value, dict_lines)) + avg = sum(values) / len(values) + print(f"{datetime.utcnow()}: Average CO2 level is {avg}") + + log_file.close() + channel.basic_ack(delivery_tag=method.delivery_tag) + + +def main(): + connection = BlockingConnection( + ConnectionParameters( + host=RMQ_HOST, + credentials=PlainCredentials(RMQ_USER, RMQ_PASS) + ) + ) + try: + channel = connection.channel() + result = channel.queue_declare(queue=ROUTING_KEY) + channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) + + channel.basic_consume(queue=ROUTING_KEY, + on_message_callback=callback) + + print('[*] Waiting for queries from the control tower. Press CTRL+C to exit') + + channel.start_consuming() + + connection.close() + except KeyboardInterrupt: + connection.close() + print('Interrupted by user. Shutting down...') + + +if __name__ == '__main__': + main() diff --git a/lab3/Week 3.html b/lab3/Week 3.html new file mode 100644 index 0000000..0394586 --- /dev/null +++ b/lab3/Week 3.html @@ -0,0 +1,371 @@ + + + + + + + + + + + + + Week 3 - Message Brokers - HackMD + + + + + + + + + + + + + + + + + +

Week 3 - Message Brokers

+

Distributed Systems and Network Programming - Spring 2023

+

Overview

Your task for this lab is to use RabbitMQ as a message broker for a pollution monitoring system.

System Architecture

Architecture diagram

diagram

Directory structure

.
+├── docker-compose.yml
+├── publishers
+│   ├── sensor.py
+│   └── control-tower.py
+└── subscribers
+    ├── receiver.py
+    └── reporter.py
+

Publishers

Subscribers

Message Broker (RabbitMQ Server)

Task

    +
  1. +

    Run RabbitMQ in a docker container using docker-compose by executing the following command in the same directory as docker-compose.yml file.

    +
    docker-compose up
    +
    +
  2. +
  3. +

    The Web UI should be available at http://localhost:15672/

    +
      +
    • You can see default login credentials (rabbit:1234) in the compose file.
    • +
    +
  4. +
  5. +

    Install pika, the Python library for interacting with RabbitMQ.

    +
      +
    • It’s recommended to create a virtual environment instead of installing pika globally.
    • +
    • You can do so by running the following commands (you may need to install python3-venv)
      python -m venv venv
      +source venv/bin/activate
      +pip install pika
      +
      +
    • +
    +
  6. +
  7. +

    Implement the system components as described above. Refer to this tutorial and pika documentation for help.

    +
      +
    • Connect to RabbitMQ server at localhost and create a channel.
    • +
    • Use amq.topic exchange, or create your own exchange of type topic
    • +
    • Send sensor values with a routing key starting with co2 (e.g., co2.sensor)
    • +
    • Send control queries with a routing key starting with rep (e.g., rep.current and rep.average)
    • +
    • Configure the receiver to listen for co2.* and the reporter to listen for rep.*
    • +
    • Don’t forget to ack the received messages so that they do not remain in the queue.
    • +
    +
  8. +
  9. +

    Submit a single ZIP archive named NameSurname.zip with publishers and subscribers directories inside.

    +
  10. +

Example Run

$ python publishers/sensor.py
+Enter CO2 level: 499
+Enter CO2 level: 500
+Enter CO2 level: 501
+Enter CO2 level: 500
+
$ python subscribers/receiver.py
+[*] Waiting for CO2 data. Press CTRL+C to exit
+2023-04-03 17:19:29: OK
+2023-04-03 17:20:02: OK
+2023-04-03 17:21:05: WARNING
+2023-04-03 17:22:08: OK
+

$ python publishers/control-tower.py
+Enter Query: current
+Enter Query: average
+
$ python subscribers/reporter.py
+[*] Waiting for queries from the control tower. Press CTRL+C to exit
+2023-04-03 17:19:30: Latest CO2 level is 499
+2023-04-03 17:23:02: Average CO2 level is 500.0
+

Checklist

+ + + + + + + + + diff --git a/lab4/.gitignore b/lab4/.gitignore new file mode 100644 index 0000000..5ceb386 --- /dev/null +++ b/lab4/.gitignore @@ -0,0 +1 @@ +venv diff --git a/lab4/AmirlanSharipov/.gitignore b/lab4/AmirlanSharipov/.gitignore new file mode 100644 index 0000000..9d458c7 --- /dev/null +++ b/lab4/AmirlanSharipov/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +db.sqlite diff --git a/lab4/AmirlanSharipov/client.py b/lab4/AmirlanSharipov/client.py new file mode 100644 index 0000000..460e311 --- /dev/null +++ b/lab4/AmirlanSharipov/client.py @@ -0,0 +1,42 @@ +import grpc + +import schema_pb2 as service +import schema_pb2_grpc as stub + + +def put_user(user_id, user_name): + args = service.User(user_id=user_id, user_name=user_name) + response = stub.PutUser(args) + print(f"PutUser({user_id}, '{user_name}') = {response.status}") + + +def get_users(): + args = service.EmptyMessage() + response = stub.GetUsers(args) + result = {} + for user in response.users: + result[user.user_id] = user.user_name + print(f"GetUsers() = {result}") + + +def delete_user(user_id): + args = service.User(user_id=user_id) + response = stub.DeleteUser(args) + print(f"DeleteUser({user_id}) = {response.status}") + + +if __name__ == '__main__': + with grpc.insecure_channel('localhost:1234') as channel: + stub = stub.DatabaseStub(channel) + + # Create four users + [put_user(i, f"User{i}") for i in range(1, 5)] + + # Update the usename of the second user + put_user(2, "User2_updated") + + # Delete the thrid user + delete_user(3) + + # Retrieve all users + get_users() diff --git a/lab4/AmirlanSharipov/schema.proto b/lab4/AmirlanSharipov/schema.proto new file mode 100644 index 0000000..0f1b546 --- /dev/null +++ b/lab4/AmirlanSharipov/schema.proto @@ -0,0 +1,29 @@ +syntax = 'proto3'; + +service Database { + rpc GetUsers(EmptyMessage) returns (UsersResponse); + rpc DeleteUser(User) returns (Response); + rpc PutUser(User) returns (Response); +} + +message User { + int32 user_id = 1; + string user_name = 2; +} + +message UserDelete { + int32 user_id = 1; +} + +message EmptyMessage { + +} + +message Response { + bool status = 1; +} + +message UsersResponse { + repeated User users = 1; +} + diff --git a/lab4/AmirlanSharipov/schema_pb2.py b/lab4/AmirlanSharipov/schema_pb2.py new file mode 100644 index 0000000..cd254a3 --- /dev/null +++ b/lab4/AmirlanSharipov/schema_pb2.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: schema.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cschema.proto\"*\n\x04User\x12\x0f\n\x07user_id\x18\x01 \x01(\x05\x12\x11\n\tuser_name\x18\x02 \x01(\t\"\x1d\n\nUserDelete\x12\x0f\n\x07user_id\x18\x01 \x01(\x05\"\x0e\n\x0c\x45mptyMessage\"\x1a\n\x08Response\x12\x0e\n\x06status\x18\x01 \x01(\x08\"%\n\rUsersResponse\x12\x14\n\x05users\x18\x01 \x03(\x0b\x32\x05.User2r\n\x08\x44\x61tabase\x12)\n\x08GetUsers\x12\r.EmptyMessage\x1a\x0e.UsersResponse\x12\x1e\n\nDeleteUser\x12\x05.User\x1a\t.Response\x12\x1b\n\x07PutUser\x12\x05.User\x1a\t.Responseb\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'schema_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _USER._serialized_start=16 + _USER._serialized_end=58 + _USERDELETE._serialized_start=60 + _USERDELETE._serialized_end=89 + _EMPTYMESSAGE._serialized_start=91 + _EMPTYMESSAGE._serialized_end=105 + _RESPONSE._serialized_start=107 + _RESPONSE._serialized_end=133 + _USERSRESPONSE._serialized_start=135 + _USERSRESPONSE._serialized_end=172 + _DATABASE._serialized_start=174 + _DATABASE._serialized_end=288 +# @@protoc_insertion_point(module_scope) diff --git a/lab4/AmirlanSharipov/schema_pb2_grpc.py b/lab4/AmirlanSharipov/schema_pb2_grpc.py new file mode 100644 index 0000000..b12bff1 --- /dev/null +++ b/lab4/AmirlanSharipov/schema_pb2_grpc.py @@ -0,0 +1,132 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import schema_pb2 as schema__pb2 + + +class DatabaseStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.GetUsers = channel.unary_unary( + '/Database/GetUsers', + request_serializer=schema__pb2.EmptyMessage.SerializeToString, + response_deserializer=schema__pb2.UsersResponse.FromString, + ) + self.DeleteUser = channel.unary_unary( + '/Database/DeleteUser', + request_serializer=schema__pb2.User.SerializeToString, + response_deserializer=schema__pb2.Response.FromString, + ) + self.PutUser = channel.unary_unary( + '/Database/PutUser', + request_serializer=schema__pb2.User.SerializeToString, + response_deserializer=schema__pb2.Response.FromString, + ) + + +class DatabaseServicer(object): + """Missing associated documentation comment in .proto file.""" + + def GetUsers(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def DeleteUser(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def PutUser(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_DatabaseServicer_to_server(servicer, server): + rpc_method_handlers = { + 'GetUsers': grpc.unary_unary_rpc_method_handler( + servicer.GetUsers, + request_deserializer=schema__pb2.EmptyMessage.FromString, + response_serializer=schema__pb2.UsersResponse.SerializeToString, + ), + 'DeleteUser': grpc.unary_unary_rpc_method_handler( + servicer.DeleteUser, + request_deserializer=schema__pb2.User.FromString, + response_serializer=schema__pb2.Response.SerializeToString, + ), + 'PutUser': grpc.unary_unary_rpc_method_handler( + servicer.PutUser, + request_deserializer=schema__pb2.User.FromString, + response_serializer=schema__pb2.Response.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'Database', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class Database(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def GetUsers(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Database/GetUsers', + schema__pb2.EmptyMessage.SerializeToString, + schema__pb2.UsersResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def DeleteUser(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Database/DeleteUser', + schema__pb2.User.SerializeToString, + schema__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def PutUser(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Database/PutUser', + schema__pb2.User.SerializeToString, + schema__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/lab4/AmirlanSharipov/server.py b/lab4/AmirlanSharipov/server.py new file mode 100644 index 0000000..a42c6f8 --- /dev/null +++ b/lab4/AmirlanSharipov/server.py @@ -0,0 +1,78 @@ +import sqlite3 +from concurrent.futures import ThreadPoolExecutor + +import grpc +import schema_pb2 as stub +import schema_pb2_grpc as service + +SERVER_ADDR = '0.0.0.0:1234' + + +class Database(service.DatabaseServicer): + def PutUser(self, request, context): + try: + con = sqlite3.connect('db.sqlite') + cur = con.cursor() + print(request.user_id, request.user_name) + cur.execute("INSERT OR REPLACE INTO User (id, name) VALUES (?, ?)", + (request.user_id, request.user_name)) + con.commit() + con.close() + return stub.Response(status=1) + except Exception as inst: + con.close() + return stub.Response(status=0) + + def GetUsers(self, request, context): + try: + con = sqlite3.connect('db.sqlite') + cur = con.cursor() + res = cur.execute('SELECT id, name FROM User') + users = [] + + for user in res.fetchall(): + users.append({'user_id': user[0], 'user_name': user[1]}) + + con.commit() + con.close() + return stub.UsersResponse(users=users) + except Exception: + con.close() + return stub.UsersResponse(users=[]) + + def DeleteUser(self, request, context): + try: + con = sqlite3.connect('db.sqlite') + cur = con.cursor() + cur.execute('DELETE FROM User WHERE id=?', (request.user_id, )) + con.commit() + con.close() + return stub.Response(status=1) + except Exception: + con.close() + return stub.Response(status=0) + + +def create_table(): + try: + con = sqlite3.connect('db.sqlite') + cur = con.cursor() + cur.execute('CREATE TABLE IF NOT EXISTS User(id INTEGER PRIMARY KEY, name TEXT)') + con.commit() + con.close() + print('Created a table') + except Exception: + print('DOESNT WORK') + + +if __name__ == '__main__': + create_table() + try: + server = grpc.server(ThreadPoolExecutor(max_workers=30)) + service.add_DatabaseServicer_to_server(Database(), server) + server.add_insecure_port(SERVER_ADDR) + server.start() + print(f'listening on {SERVER_ADDR}') + server.wait_for_termination() + except KeyboardInterrupt: + print('Interrupted by user. Shutting down...') diff --git a/lab4/Lab 4.html b/lab4/Lab 4.html new file mode 100644 index 0000000..2e0329c --- /dev/null +++ b/lab4/Lab 4.html @@ -0,0 +1,312 @@ + + + + + + + + + + + + + Week 4 - Remote Procedure Call - HackMD + + + + + + + + + + + + + + + + + +

Week 4 - Remote Procedure Call

+

Distributed Systems and Network Programming - Spring 2023

+

Overview

Your task for this lab is to use gRPC to remotely call functions defined on a server.

Remote Functions

The server should expose the following functions for RPC, the logic for each function is explained below:

Task

    +
  1. +

    Create a Python virtual environment and install the required external dependencies

    +
    python3 -m venv venv
    +source venv/bin/activate
    +pip3 install grpcio grpcio-tools
    +
    +
  2. +
  3. +

    Create schema.proto which defines the following:

    +
      +
    • The database service with remote functions that can be called through rpc.
    • +
    • Request/response message format for the client/server communication.
    • +
    +
  4. +
  5. +

    Compile the schema file to generate the stub and service source files (schema_pb2_grpc.py and schema_pb2.py) using the following command:

    +
    python3 -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. schema.proto
    +
    +
  6. +
  7. +

    Write the gRPC server code to do the following:

    +
      +
    • Create or overwrite a local database file db.sqlite in the current directory.
    • +
    • Initialize the database with an empty table Users(id INTEGER, name STRING)
    • +
    • Create a grpc.server that listens forever for client RPC requests and executes them. +
        +
      • The server should terminate gracefully whenever a KeyboardInterrupt is received.
      • +
      • Upon receiving a request from the client, the server should print the name of the function to be executed along with the supplied arguments.
      • +
      +
    • +
    • Implement the functions for PutUser, DeleteUser, and GetUsers as explained above.
    • +
    +
  8. +
  9. +

    Run your gRPC server, then run the given client (you are not supposed to modify the client).

    +
  10. +
  11. +

    Verify that the database was populated by inspecting the file db.sqlite using your favorite SQLite viewer tool/extension.

    +
  12. +

Example Run

$ python3 server.py
+gRPC server is listening on 0.0.0.0:1234
+PutUser(1, 'User1')
+PutUser(2, 'User2')
+PutUser(3, 'User3')
+PutUser(4, 'User4')
+PutUser(2, 'User2_updated')
+DeleteUser(3)
+GetUsers()
+
$ python3 client.py
+PutUser(1, 'User1') = True
+PutUser(2, 'User2') = True
+PutUser(3, 'User3') = True
+PutUser(4, 'User4') = True
+PutUser(2, 'User2_updated') = True
+DeleteUser(3) = True
+GetUsers() = {1: 'User1', 2: 'User2_updated', 4: 'User4'}
+

Checklist

+ + + + + + + + + diff --git a/lab5/Week 5.html b/lab5/Week 5.html new file mode 100644 index 0000000..9216e48 --- /dev/null +++ b/lab5/Week 5.html @@ -0,0 +1,385 @@ + + + + + + + + + + + + + Week 5 - Distributed Hash Tables - HackMD + + + + + + + + + + + + + + + + + +

Week 5 - Distributed Hash Tables

+

Distributed Systems and Network Programming - Spring 2023

+

Overview

Your task for this lab is to implement a simplified version of the Chord algorithm used to maintain a Distributed Hash Table (DHT) in peer-to-peer systems

System Architecture

Chord operates over a structured P2P overlay network in which nodes (peers) are organized in a ring

Node

Client

Task

Example Run

Input

Output

Visualization

Checklist

Additional Notes

+ + + + + + + + + diff --git a/lab5/lab5-task/Dockerfile b/lab5/lab5-task/Dockerfile new file mode 100644 index 0000000..6abdea0 --- /dev/null +++ b/lab5/lab5-task/Dockerfile @@ -0,0 +1,7 @@ +FROM python:alpine + +WORKDIR /app + +ENV PYTHONUNBUFFERED=1 + +COPY *.py ./ diff --git a/lab5/lab5-task/client.py b/lab5/lab5-task/client.py new file mode 100644 index 0000000..15bc6c8 --- /dev/null +++ b/lab5/lab5-task/client.py @@ -0,0 +1,29 @@ +from xmlrpc.client import ServerProxy +import random + +M = 5 +PORT = 1234 +RING = [2, 7, 11, 17, 22, 27] + + +def lookup(node_id: int, key: int) -> None: + """Calls the get method for a remote node over RPC and print the result""" + with ServerProxy(f'http://node_{node_id}:{PORT}') as node: + print(f"lookup({node_id}, {key}) = {node.get(key)}") + + +if __name__ == '__main__': + print("Client started") + + # Asking a random node to insert an entry into the DHT + # String keys should be consistently hashed to an integer value between 0 and (2**M)-1 + for i in range(2 ** M): + with ServerProxy(f'http://node_{random.choice(RING)}:{PORT}') as node: + node.put(i, f"value_{i}") + + # Querying a DHT node for the value of a certain key. + lookup(2, 20) + lookup(11, 15) + lookup(17, 1) + lookup(22, 27) + lookup(2, 5) diff --git a/lab5/lab5-task/docker-compose.yml b/lab5/lab5-task/docker-compose.yml new file mode 100644 index 0000000..a6710c8 --- /dev/null +++ b/lab5/lab5-task/docker-compose.yml @@ -0,0 +1,45 @@ +version: '3' + +services: + client: + image: chord + build: . + container_name: client + command: python3 client.py + depends_on: [ node_2, node_7, node_11, node_17, node_22, node_27 ] + + node_2: + image: chord + build: . + container_name: node_2 + command: sh -c 'python3 node*.py 2' + + node_7: + image: chord + build: . + container_name: node_7 + command: sh -c 'python3 node*.py 7' + + node_11: + image: chord + build: . + container_name: node_11 + command: sh -c 'python3 node*.py 11' + + node_17: + image: chord + build: . + container_name: node_17 + command: sh -c 'python3 node*.py 17' + + node_22: + image: chord + build: . + container_name: node_22 + command: sh -c 'python3 node*.py 22' + + node_27: + image: chord + build: . + container_name: node_27 + command: sh -c 'python3 node*.py 27' diff --git a/lab5/lab5-task/node_AmirlanSharipov.py b/lab5/lab5-task/node_AmirlanSharipov.py new file mode 100644 index 0000000..460c2ef --- /dev/null +++ b/lab5/lab5-task/node_AmirlanSharipov.py @@ -0,0 +1,142 @@ +from argparse import ArgumentParser +from bisect import bisect_left, bisect_right +from threading import Thread +from xmlrpc.client import ServerProxy +from xmlrpc.server import SimpleXMLRPCServer + +import traceback + +M = 5 +PORT = 1234 +RING = [2, 7, 11, 17, 22, 27] + + +class Node: + def __init__(self, node_id): + """Initializes the node properties and constructs the finger table according to the Chord formula""" + # Assuming that the program knows all the nodes and stores them in a sorted array RING + self.node_id = node_id + self.finger_table = [] + self.successor_id = RING[(RING.index(node_id) + 1) % len(RING)] + self.predecessor_id = RING[RING.index(node_id) - 1] + self.table = {} + + for i in range(M): + self.finger_table.append(RING[bisect_left(RING, ((node_id + (2 ** i)) % (2 ** M))) % len(RING)]) + + print(f"Node created! Finger table = {self.finger_table}, [pred, succ] = [{self.predecessor_id}, {self.successor_id}]") + + def closest_preceding_node(self, id): + """Returns node_id of the closest preceeding node (from n.finger_table) for a given id""" + for i in reversed(self.finger_table): + if i == RING[-1]: + idx = bisect_left([RING[0], RING[-1]], id) + if idx == 0 or idx == 2: + return i + elif self.node_id > i: + idx = bisect_left([i, self.node_id], id) + if idx == 1: + return i + else: + if i > self.node_id and i < id: + return i + return self.finger_table[-1] + + def find_successor(self, id): + """Recursive function returning the identifier of the node responsible for a given id""" + + if id == self.node_id: + return id + + # Note the half-open interval and that L <= R does not necessarily hold + if self.successor_id < self.node_id: + idx = bisect_left([self.successor_id, self.node_id], id) + if idx == 0 or idx == 2: + return self.successor_id + elif id in range(self.node_id, self.successor_id + 1): + return self.successor_id + + # Forward the query to the closest preceding node in the finger table for n + n0 = self.closest_preceding_node(id) + print(f'Forwarding request to node {n0}') + with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: + return proxy.find_successor(id) + + def put(self, key, value): + """Stores the given key-value pair in the node responsible for it""" + try: + print(f"put({key}, {value})") + + if self.node_id < self.predecessor_id: + idx = bisect_left([self.node_id, self.predecessor_id], key) + if idx == 0 or idx == 2: + return self.store_item(key, value) + elif key in range(self.predecessor_id, self.node_id + 1): + return self.store_item(key, value) + + n0 = self.find_successor(key) + if self.node_id == n0: + return self.store_item(key, value) + + with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: + return proxy.store_item(key, value) + except Exception as e: + print(f"couldn't put({key}, {value})") + print(traceback.format_exc()) + print(e) + return False + + def get(self, key): + """Gets the value for a given key from the node responsible for it""" + try: + print(f"get({key})") + if self.node_id < self.predecessor_id: + idx = bisect_left([self.node_id, self.predecessor_id], key) + if idx == 0 or idx == 2: + return self.retrieve_item(key) + elif key in range(self.predecessor_id, self.node_id + 1): + return self.retrieve_item(key) + + n0 = self.find_successor(key) + if self.node_id == n0: + return self.retrieve_item(key) + + with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: + return proxy.retrieve_item(key) + except Exception as e: + print(f"couldn't get({key})") + print(traceback.format_exc()) + print(e) + return -1 + + def store_item(self, key, value): + """Stores a key-value pair into the data store of this node""" + self.table[key] = value + return True + + def retrieve_item(self, key): + """Retrieves a value for a given key from the data store of this node""" + if key in self.table: + return self.table[key] + return -1 + + +if __name__ == '__main__': + try: + parser = ArgumentParser() + parser.add_argument('node_id') + args = parser.parse_args() + node = Node(int(args.node_id)) + + server = SimpleXMLRPCServer(('0.0.0.0', PORT)) + print("Listening on port 1234...") + server.register_function(node.get, "get") + server.register_function(node.put, "put") + server.register_function(node.retrieve_item, "retrieve_item") + server.register_function(node.store_item, "store_item") + server.register_function(node.find_successor, "find_successor") + server.serve_forever() + except KeyboardInterrupt: + print("node killed...") + exit() + diff --git a/lab6/Week 6.html b/lab6/Week 6.html new file mode 100644 index 0000000..dc0c8a0 --- /dev/null +++ b/lab6/Week 6.html @@ -0,0 +1,316 @@ + + + + + + + + + + + + + Week 6 - Distributed Consensus - HackMD + + + + + + + + + + + + + + + + + +

Week 6 - Distributed Consensus

+

Distributed Systems and Network Programming - Spring 2023

+

Overview

Your task for this lab is to use RAFT algorithm to maintain consensus between 3 nodes in a P2P system. All nodes in the cluster need to maintain a consistent replicated log (list of strings).

Brief Algorithm Description

RAFT works in two phases: leader election and log replication.

Leader Election

Log Replication

Task

Example Run

Input

Output

Typical Run

typical_run

Split-vote case

split_vote

Dead leader

img

Dead follower

dead_follower

Checklist

Additional Notes

+ + + + + + + + + diff --git a/lab6/lab6-task/Dockerfile b/lab6/lab6-task/Dockerfile new file mode 100644 index 0000000..6abdea0 --- /dev/null +++ b/lab6/lab6-task/Dockerfile @@ -0,0 +1,7 @@ +FROM python:alpine + +WORKDIR /app + +ENV PYTHONUNBUFFERED=1 + +COPY *.py ./ diff --git a/lab6/lab6-task/client.py b/lab6/lab6-task/client.py new file mode 100644 index 0000000..d8cc4c0 --- /dev/null +++ b/lab6/lab6-task/client.py @@ -0,0 +1,22 @@ +from xmlrpc.client import ServerProxy +import socket +import time + +PORT = 1234 +CLUSTER = [1, 2, 3] +LOGS = ['SET 5', 'ADD 1'] + +if __name__ == '__main__': + time.sleep(10) # Wait for leader election processs + print('Client started') + for node_id in CLUSTER: + try: + with ServerProxy(f'http://node_{node_id}:{PORT}') as node: + if node.is_leader(): + print(f"Node {node_id} is the cluster leader. Sending logs") + for log in LOGS: + if node.leader_receive_log(log): + print(f"Leader committed '{log}'") + time.sleep(5) # Wait for entries to propagate + except socket.error as e: + print(f"Failed to connect to node_{node_id}: {e}") diff --git a/lab6/lab6-task/docker-compose.yml b/lab6/lab6-task/docker-compose.yml new file mode 100644 index 0000000..75e44f8 --- /dev/null +++ b/lab6/lab6-task/docker-compose.yml @@ -0,0 +1,27 @@ +version: '3' + +services: + client: + build: . + image: raft + container_name: client + depends_on: [ node_1, node_2, node_3 ] + command: python3 client.py + + node_1: + build: . + image: raft + container_name: node_1 + command: sh -c 'python3 node*.py 1' + + node_2: + build: . + image: raft + container_name: node_2 + command: sh -c 'python3 node*.py 2' + + node_3: + build: . + image: raft + container_name: node_3 + command: sh -c 'python3 node*.py 3' diff --git a/lab6/lab6-task/node_AmirlanSharipov.py b/lab6/lab6-task/node_AmirlanSharipov.py new file mode 100644 index 0000000..87a1d5d --- /dev/null +++ b/lab6/lab6-task/node_AmirlanSharipov.py @@ -0,0 +1,205 @@ +import random +import sched +import socket +import time +from threading import Thread +from argparse import ArgumentParser +from enum import Enum +from xmlrpc.client import ServerProxy +from xmlrpc.server import SimpleXMLRPCServer + +PORT = 1234 +CLUSTER = [1, 2, 3] +ELECTION_TIMEOUT = (6, 8) +HEARTBEAT_INTERVAL = 5 + + +class NodeState(Enum): + """Enumerates the three possible node states (follower, candidate, or leader)""" + FOLLOWER = 1 + CANDIDATE = 2 + LEADER = 3 + + +class Node: + def __init__(self, node_id): + """Non-blocking procedure to initialize all node parameters and start the first election timer""" + self.node_id = node_id + self.state = NodeState.FOLLOWER + self.term = 0 + self.votes = {} + self.log = [] + self.pending_entry = '' + self.sched = sched.scheduler() + self.event = '' + # TODO: start election timer for this node + self.reset_election_timer() + print(f"Node started! State: {self.state}. Term: {self.term}") + + def is_leader(self): + """Returns True if this node is the elected cluster leader and False otherwise""" + if self.state == NodeState.LEADER: + return True + return False + + def reset_election_timer(self): + """Resets election timer for this (follower or candidate) node and returns it to the follower state""" + self.state = NodeState.FOLLOWER + + q = self.sched.queue + for event in q: + self.sched.cancel(event) + + #if (self.node_id == 1 or self.node_id == 3): + # self.sched.enter(0, 1, self.hold_election, ()) + # return + self.sched.enter(random.uniform(ELECTION_TIMEOUT[0], ELECTION_TIMEOUT[1]), 1, self.hold_election, ()) + + def reset_heartbeat_timer(self): + q = self.sched.queue + for event in q: + self.sched.cancel(event) + + self.sched.enter(HEARTBEAT_INTERVAL, 1, self.append_entries, ()) + + def hold_election(self): + """Called when this follower node is done waiting for a message from a leader (election timeout) + The node increments term number, becomes a candidate and votes for itself. + Then call request_vote over RPC for all other online nodes and collects their votes. + If the node gets the majority of votes, it becomes a leader and starts the hearbeat timer + If the node loses the election, it returns to the follower state and resets election timer. + """ + self.term = self.term + 1 + self.state = NodeState.CANDIDATE + self.votes = {} + self.votes[self.node_id] = True + print(f'New election term {self.term}. State: {self.state}') + + for n0 in CLUSTER: + if node == self.node_id: + continue + + try: + print(f'Requesting vote from node {n0}') + with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: + if proxy.request_vote(self.term, self.node_id): + self.votes[n0] = True + else: + self.votes[n0] = False + except Exception as e: + print(f"couldn't request_vote from {n0}") + print(traceback.format_exc()) + print(e) + + if sum(self.votes.values()) > len(CLUSTER) / 2: + self.state = NodeState.LEADER + self.reset_heartbeat_timer() + + print(f"New election term {self.term}. State: {self.state}") + + def request_vote(self, term, candidate_id): + """Called remotely when a node requests voting from other nodes. + Updates the term number if the received one is greater than `self.term` + A node rejects the vote request if it's a leader or it already voted in this term. + Returns True and update `self.votes` if the vote is granted to the requester candidate and False otherwise. + """ + + print(f"Got a vote request from {candidate_id} (term={term})") + self.reset_election_timer() + + if term > self.term: + self.term = term + self.votes = {} + + if self.is_leader() or len(self.votes) > 0: + return False + + self.votes[candidate_id] = True + return True + + def append_entries(self): + """Called by leader every HEARTBEAT_INTERVAL, sends a heartbeat message over RPC to all online followers. + Accumulates ACKs from followers for a pending log entry (if any) + If the majority of followers ACKed the entry, the entry is committed to the log and is no longer pending + """ + print("Sending a heartbeat to followers") + + acks = 0 + for n0 in CLUSTER: + if n0 == self.node_id: + continue + + try: + with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: + if proxy.heartbeat(self.pending_entry): + acks = acks + 1 + except Exception as e: + print(f"couldn't heartbeat {n0}") + print(traceback.format_exc()) + print(e) + + if self.pending_entry != '' and acks > len(CLUSTER) / 2: + self.log.append(self.pending_entry) + print(f'Leader commited \'{self.pending_entry}\'') + self.pending_entry = '' + + self.reset_heartbeat_timer() + + def heartbeat(self, leader_entry): + """Called remotely from the leader to inform followers that it's alive and supply any pending log entry + Followers would commit an entry if it was pending before, but is no longer now. + Returns True to ACK the heartbeat and False on any problems. + """ + print(f"Heartbeat received from leader (entry='{leader_entry}')") + try: + self.reset_election_timer() + if self.pending_entry != '' and leader_entry != self.pending_entry: + self.log.append(self.pending_entry) + print(f'Follower commited \'{self.pending_entry}\'') + + self.pending_entry = leader_entry + + return True + except Exception as e: + return False + + def leader_receive_log(self, log): + """Called remotely from the client. Executed only by the leader upon receiving a new log entry + Returns True after the entry is committed to the leader log and False on any problems + """ + print(f"Leader received log \'{log}\' from client") + while self.pending_entry != '': + time.sleep(1) + + self.pending_entry = log + time.sleep(7) + if self.pending_entry == '' and self.log[-1] == log: + return True + return False + + +if __name__ == '__main__': + # TODO: Parse one integer argument (node_id), then create the node with that ID. + # TODO: Start RPC server on 0.0.0.0:PORT and expose the node instance + # TODO: Run the node scheduler in an isolated thread. + # TODO: Handle KeyboardInterrupt and terminate gracefully. + try: + parser = ArgumentParser() + parser.add_argument('node_id') + args = parser.parse_args() + node = Node(int(args.node_id)) + + t = Thread(target=node.sched.run) + t.start() + + server = SimpleXMLRPCServer(('0.0.0.0', PORT), logRequests=False) + print(f"Listening on port {PORT}...") + server.register_function(node.leader_receive_log, "leader_receive_log") + server.register_function(node.heartbeat, "heartbeat") + server.register_function(node.request_vote, "request_vote") + server.register_function(node.is_leader, "is_leader") + server.serve_forever() + except KeyboardInterrupt: + print("node killed...") + exit() +