@@ -0,0 +1 @@ | |||
lec |
@@ -0,0 +1,22 @@ | |||
from xmlrpc.client import ServerProxy | |||
import socket | |||
import time | |||
PORT = 1234 | |||
CLUSTER = [1, 2, 3] | |||
LOGS = ['SET 5', 'ADD 1'] | |||
if __name__ == '__main__': | |||
time.sleep(10) # Wait for leader election processs | |||
print('Client started') | |||
for node_id in CLUSTER: | |||
try: | |||
with ServerProxy(f'http://node_{node_id}:{PORT}') as node: | |||
if node.is_leader(): | |||
print(f"Node {node_id} is the cluster leader. Sending logs") | |||
for log in LOGS: | |||
if node.leader_receive_log(log): | |||
print(f"Leader committed '{log}'") | |||
time.sleep(5) # Wait for entries to propagate | |||
except socket.error as e: | |||
print(f"Failed to connect to node_{node_id}: {e}") |
@@ -0,0 +1,205 @@ | |||
import random | |||
import sched | |||
import socket | |||
import time | |||
from threading import Thread | |||
from argparse import ArgumentParser | |||
from enum import Enum | |||
from xmlrpc.client import ServerProxy | |||
from xmlrpc.server import SimpleXMLRPCServer | |||
PORT = 1234 | |||
CLUSTER = [1, 2, 3] | |||
ELECTION_TIMEOUT = (6, 8) | |||
HEARTBEAT_INTERVAL = 5 | |||
class NodeState(Enum): | |||
"""Enumerates the three possible node states (follower, candidate, or leader)""" | |||
FOLLOWER = 1 | |||
CANDIDATE = 2 | |||
LEADER = 3 | |||
class Node: | |||
def __init__(self, node_id): | |||
"""Non-blocking procedure to initialize all node parameters and start the first election timer""" | |||
self.node_id = node_id | |||
self.state = NodeState.FOLLOWER | |||
self.term = 0 | |||
self.votes = {} | |||
self.log = [] | |||
self.pending_entry = '' | |||
self.sched = sched.scheduler() | |||
self.event = '' | |||
# TODO: start election timer for this node | |||
self.reset_election_timer() | |||
print(f"Node started! State: {self.state}. Term: {self.term}") | |||
def is_leader(self): | |||
"""Returns True if this node is the elected cluster leader and False otherwise""" | |||
if self.state == NodeState.LEADER: | |||
return True | |||
return False | |||
def reset_election_timer(self): | |||
"""Resets election timer for this (follower or candidate) node and returns it to the follower state""" | |||
self.state = NodeState.FOLLOWER | |||
q = self.sched.queue | |||
for event in q: | |||
self.sched.cancel(event) | |||
#if (self.node_id == 1 or self.node_id == 3): | |||
# self.sched.enter(0, 1, self.hold_election, ()) | |||
# return | |||
self.sched.enter(random.uniform(ELECTION_TIMEOUT[0], ELECTION_TIMEOUT[1]), 1, self.hold_election, ()) | |||
def reset_heartbeat_timer(self): | |||
q = self.sched.queue | |||
for event in q: | |||
self.sched.cancel(event) | |||
self.sched.enter(HEARTBEAT_INTERVAL, 1, self.append_entries, ()) | |||
def hold_election(self): | |||
"""Called when this follower node is done waiting for a message from a leader (election timeout) | |||
The node increments term number, becomes a candidate and votes for itself. | |||
Then call request_vote over RPC for all other online nodes and collects their votes. | |||
If the node gets the majority of votes, it becomes a leader and starts the hearbeat timer | |||
If the node loses the election, it returns to the follower state and resets election timer. | |||
""" | |||
self.term = self.term + 1 | |||
self.state = NodeState.CANDIDATE | |||
self.votes = {} | |||
self.votes[self.node_id] = True | |||
print(f'New election term {self.term}. State: {self.state}') | |||
for n0 in CLUSTER: | |||
if node == self.node_id: | |||
continue | |||
try: | |||
print(f'Requesting vote from node {n0}') | |||
with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: | |||
if proxy.request_vote(self.term, self.node_id): | |||
self.votes[n0] = True | |||
else: | |||
self.votes[n0] = False | |||
except Exception as e: | |||
print(f"couldn't request_vote from {n0}") | |||
print(traceback.format_exc()) | |||
print(e) | |||
if sum(self.votes.values()) > len(CLUSTER) / 2: | |||
self.state = NodeState.LEADER | |||
self.reset_heartbeat_timer() | |||
print(f"New election term {self.term}. State: {self.state}") | |||
def request_vote(self, term, candidate_id): | |||
"""Called remotely when a node requests voting from other nodes. | |||
Updates the term number if the received one is greater than `self.term` | |||
A node rejects the vote request if it's a leader or it already voted in this term. | |||
Returns True and update `self.votes` if the vote is granted to the requester candidate and False otherwise. | |||
""" | |||
print(f"Got a vote request from {candidate_id} (term={term})") | |||
self.reset_election_timer() | |||
if term > self.term: | |||
self.term = term | |||
self.votes = {} | |||
if self.is_leader() or len(self.votes) > 0: | |||
return False | |||
self.votes[candidate_id] = True | |||
return True | |||
def append_entries(self): | |||
"""Called by leader every HEARTBEAT_INTERVAL, sends a heartbeat message over RPC to all online followers. | |||
Accumulates ACKs from followers for a pending log entry (if any) | |||
If the majority of followers ACKed the entry, the entry is committed to the log and is no longer pending | |||
""" | |||
print("Sending a heartbeat to followers") | |||
acks = 0 | |||
for n0 in CLUSTER: | |||
if n0 == self.node_id: | |||
continue | |||
try: | |||
with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: | |||
if proxy.heartbeat(self.pending_entry): | |||
acks = acks + 1 | |||
except Exception as e: | |||
print(f"couldn't heartbeat {n0}") | |||
print(traceback.format_exc()) | |||
print(e) | |||
if self.pending_entry != '' and acks > len(CLUSTER) / 2: | |||
self.log.append(self.pending_entry) | |||
print(f'Leader commited \'{self.pending_entry}\'') | |||
self.pending_entry = '' | |||
self.reset_heartbeat_timer() | |||
def heartbeat(self, leader_entry): | |||
"""Called remotely from the leader to inform followers that it's alive and supply any pending log entry | |||
Followers would commit an entry if it was pending before, but is no longer now. | |||
Returns True to ACK the heartbeat and False on any problems. | |||
""" | |||
print(f"Heartbeat received from leader (entry='{leader_entry}')") | |||
try: | |||
self.reset_election_timer() | |||
if self.pending_entry != '' and leader_entry != self.pending_entry: | |||
self.log.append(self.pending_entry) | |||
print(f'Follower commited \'{self.pending_entry}\'') | |||
self.pending_entry = leader_entry | |||
return True | |||
except Exception as e: | |||
return False | |||
def leader_receive_log(self, log): | |||
"""Called remotely from the client. Executed only by the leader upon receiving a new log entry | |||
Returns True after the entry is committed to the leader log and False on any problems | |||
""" | |||
print(f"Leader received log \'{log}\' from client") | |||
while self.pending_entry != '': | |||
time.sleep(1) | |||
self.pending_entry = log | |||
time.sleep(7) | |||
if self.pending_entry == '' and self.log[-1] == log: | |||
return True | |||
return False | |||
if __name__ == '__main__': | |||
# TODO: Parse one integer argument (node_id), then create the node with that ID. | |||
# TODO: Start RPC server on 0.0.0.0:PORT and expose the node instance | |||
# TODO: Run the node scheduler in an isolated thread. | |||
# TODO: Handle KeyboardInterrupt and terminate gracefully. | |||
try: | |||
parser = ArgumentParser() | |||
parser.add_argument('node_id') | |||
args = parser.parse_args() | |||
node = Node(int(args.node_id)) | |||
t = Thread(target=node.sched.run) | |||
t.start() | |||
server = SimpleXMLRPCServer(('0.0.0.0', PORT), logRequests=False) | |||
print(f"Listening on port {PORT}...") | |||
server.register_function(node.leader_receive_log, "leader_receive_log") | |||
server.register_function(node.heartbeat, "heartbeat") | |||
server.register_function(node.request_vote, "request_vote") | |||
server.register_function(node.is_leader, "is_leader") | |||
server.serve_forever() | |||
except KeyboardInterrupt: | |||
print("node killed...") | |||
exit() | |||
@@ -0,0 +1,34 @@ | |||
import socket | |||
import json | |||
MSS = 20476 # MSS = Server buffer size (20480) - data header size (4) | |||
class Query: | |||
type = '' | |||
key = '' | |||
def __init__(self, type, key): | |||
self.type = type | |||
self.key = key | |||
def json_str(self): | |||
return json.dumps(self.__dict__) | |||
def await_response(s): | |||
data, addr = s.recvfrom(MSS) | |||
print(f'Server: {json.loads(data)}') | |||
if __name__ == "__main__": | |||
server_ip, server_port = ('127.0.0.1', 50000) | |||
queries = (Query('A', 'example.com'), | |||
Query('PTR', '1.2.3.4'), | |||
Query('CNAME', 'moodle.com')) | |||
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: | |||
for query in queries: | |||
print(f'Performing query {query.json_str()}:') | |||
s.sendto(query.json_str().encode(), (server_ip, server_port)) | |||
await_response(s) | |||
@@ -0,0 +1,51 @@ | |||
import socket | |||
import json | |||
IP_ADDR = "0.0.0.0" | |||
PORT = 50000 | |||
MSS = 24000 # MSS = Server buffer size (20480) - data header size (4) | |||
class RR: | |||
type = '' | |||
key = '' | |||
value = '' | |||
def __init__(self, type, key, value): | |||
self.type = type | |||
self.key = key | |||
self.value = value | |||
def json_str(self): | |||
return json.dumps(self.__dict__) | |||
if __name__ == "__main__": | |||
server_port = PORT | |||
records = (RR('A', 'example.com', '1.2.3.4'), RR('PTR', '1.2.3.4', 'example.com')) | |||
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: | |||
s.bind((IP_ADDR, server_port)) | |||
print(f'Listening on {IP_ADDR}:{server_port}...') | |||
try: | |||
while True: | |||
data, addr = s.recvfrom(MSS) | |||
query = json.loads(data) | |||
print(f'Client {query}') | |||
ok = False | |||
for record in records: | |||
if record.type == query['type'] and record.key == query['key']: | |||
print('Server: Record found. Sending answer.') | |||
s.sendto(record.json_str().encode(), addr) | |||
ok = True | |||
break | |||
if ok == False: | |||
print('Server: Record not found. Sending error.') | |||
record = RR(query['type'], query['key'], 'NXDOMAIN') | |||
s.sendto(record.json_str().encode(), addr) | |||
except KeyboardInterrupt: | |||
print('Server: Interrupted by user. Exiting') | |||
exit(0) |
@@ -0,0 +1 @@ | |||
__pycache__ |
@@ -0,0 +1,17 @@ | |||
syntax = 'proto3'; | |||
service Calculator { | |||
rpc Add(Request) returns (FloatResponse); | |||
rpc Substract(Request) returns (FloatResponse); | |||
rpc Multiply(Request) returns (FloatResponse); | |||
rpc Divide(Request) returns (FloatResponse); | |||
} | |||
message Request { | |||
int32 a = 1; | |||
int32 b = 2; | |||
} | |||
message FloatResponse { | |||
float ans = 1; | |||
} |
@@ -0,0 +1,29 @@ | |||
# -*- coding: utf-8 -*- | |||
# Generated by the protocol buffer compiler. DO NOT EDIT! | |||
# source: calculator.proto | |||
"""Generated protocol buffer code.""" | |||
from google.protobuf.internal import builder as _builder | |||
from google.protobuf import descriptor as _descriptor | |||
from google.protobuf import descriptor_pool as _descriptor_pool | |||
from google.protobuf import symbol_database as _symbol_database | |||
# @@protoc_insertion_point(imports) | |||
_sym_db = _symbol_database.Default() | |||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x63\x61lculator.proto\"\x1f\n\x07Request\x12\t\n\x01\x61\x18\x01 \x01(\x05\x12\t\n\x01\x62\x18\x02 \x01(\x05\"\x1c\n\rFloatResponse\x12\x0b\n\x03\x61ns\x18\x01 \x01(\x02\x32\x9e\x01\n\nCalculator\x12\x1f\n\x03\x41\x64\x64\x12\x08.Request\x1a\x0e.FloatResponse\x12%\n\tSubstract\x12\x08.Request\x1a\x0e.FloatResponse\x12$\n\x08Multiply\x12\x08.Request\x1a\x0e.FloatResponse\x12\"\n\x06\x44ivide\x12\x08.Request\x1a\x0e.FloatResponseb\x06proto3') | |||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) | |||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'calculator_pb2', globals()) | |||
if _descriptor._USE_C_DESCRIPTORS == False: | |||
DESCRIPTOR._options = None | |||
_REQUEST._serialized_start=20 | |||
_REQUEST._serialized_end=51 | |||
_FLOATRESPONSE._serialized_start=53 | |||
_FLOATRESPONSE._serialized_end=81 | |||
_CALCULATOR._serialized_start=84 | |||
_CALCULATOR._serialized_end=242 | |||
# @@protoc_insertion_point(module_scope) |
@@ -0,0 +1,165 @@ | |||
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! | |||
"""Client and server classes corresponding to protobuf-defined services.""" | |||
import grpc | |||
import calculator_pb2 as calculator__pb2 | |||
class CalculatorStub(object): | |||
"""Missing associated documentation comment in .proto file.""" | |||
def __init__(self, channel): | |||
"""Constructor. | |||
Args: | |||
channel: A grpc.Channel. | |||
""" | |||
self.Add = channel.unary_unary( | |||
'/Calculator/Add', | |||
request_serializer=calculator__pb2.Request.SerializeToString, | |||
response_deserializer=calculator__pb2.FloatResponse.FromString, | |||
) | |||
self.Substract = channel.unary_unary( | |||
'/Calculator/Substract', | |||
request_serializer=calculator__pb2.Request.SerializeToString, | |||
response_deserializer=calculator__pb2.FloatResponse.FromString, | |||
) | |||
self.Multiply = channel.unary_unary( | |||
'/Calculator/Multiply', | |||
request_serializer=calculator__pb2.Request.SerializeToString, | |||
response_deserializer=calculator__pb2.FloatResponse.FromString, | |||
) | |||
self.Divide = channel.unary_unary( | |||
'/Calculator/Divide', | |||
request_serializer=calculator__pb2.Request.SerializeToString, | |||
response_deserializer=calculator__pb2.FloatResponse.FromString, | |||
) | |||
class CalculatorServicer(object): | |||
"""Missing associated documentation comment in .proto file.""" | |||
def Add(self, request, context): | |||
"""Missing associated documentation comment in .proto file.""" | |||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | |||
context.set_details('Method not implemented!') | |||
raise NotImplementedError('Method not implemented!') | |||
def Substract(self, request, context): | |||
"""Missing associated documentation comment in .proto file.""" | |||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | |||
context.set_details('Method not implemented!') | |||
raise NotImplementedError('Method not implemented!') | |||
def Multiply(self, request, context): | |||
"""Missing associated documentation comment in .proto file.""" | |||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | |||
context.set_details('Method not implemented!') | |||
raise NotImplementedError('Method not implemented!') | |||
def Divide(self, request, context): | |||
"""Missing associated documentation comment in .proto file.""" | |||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | |||
context.set_details('Method not implemented!') | |||
raise NotImplementedError('Method not implemented!') | |||
def add_CalculatorServicer_to_server(servicer, server): | |||
rpc_method_handlers = { | |||
'Add': grpc.unary_unary_rpc_method_handler( | |||
servicer.Add, | |||
request_deserializer=calculator__pb2.Request.FromString, | |||
response_serializer=calculator__pb2.FloatResponse.SerializeToString, | |||
), | |||
'Substract': grpc.unary_unary_rpc_method_handler( | |||
servicer.Substract, | |||
request_deserializer=calculator__pb2.Request.FromString, | |||
response_serializer=calculator__pb2.FloatResponse.SerializeToString, | |||
), | |||
'Multiply': grpc.unary_unary_rpc_method_handler( | |||
servicer.Multiply, | |||
request_deserializer=calculator__pb2.Request.FromString, | |||
response_serializer=calculator__pb2.FloatResponse.SerializeToString, | |||
), | |||
'Divide': grpc.unary_unary_rpc_method_handler( | |||
servicer.Divide, | |||
request_deserializer=calculator__pb2.Request.FromString, | |||
response_serializer=calculator__pb2.FloatResponse.SerializeToString, | |||
), | |||
} | |||
generic_handler = grpc.method_handlers_generic_handler( | |||
'Calculator', rpc_method_handlers) | |||
server.add_generic_rpc_handlers((generic_handler,)) | |||
# This class is part of an EXPERIMENTAL API. | |||
class Calculator(object): | |||
"""Missing associated documentation comment in .proto file.""" | |||
@staticmethod | |||
def Add(request, | |||
target, | |||
options=(), | |||
channel_credentials=None, | |||
call_credentials=None, | |||
insecure=False, | |||
compression=None, | |||
wait_for_ready=None, | |||
timeout=None, | |||
metadata=None): | |||
return grpc.experimental.unary_unary(request, target, '/Calculator/Add', | |||
calculator__pb2.Request.SerializeToString, | |||
calculator__pb2.FloatResponse.FromString, | |||
options, channel_credentials, | |||
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) | |||
@staticmethod | |||
def Substract(request, | |||
target, | |||
options=(), | |||
channel_credentials=None, | |||
call_credentials=None, | |||
insecure=False, | |||
compression=None, | |||
wait_for_ready=None, | |||
timeout=None, | |||
metadata=None): | |||
return grpc.experimental.unary_unary(request, target, '/Calculator/Substract', | |||
calculator__pb2.Request.SerializeToString, | |||
calculator__pb2.FloatResponse.FromString, | |||
options, channel_credentials, | |||
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) | |||
@staticmethod | |||
def Multiply(request, | |||
target, | |||
options=(), | |||
channel_credentials=None, | |||
call_credentials=None, | |||
insecure=False, | |||
compression=None, | |||
wait_for_ready=None, | |||
timeout=None, | |||
metadata=None): | |||
return grpc.experimental.unary_unary(request, target, '/Calculator/Multiply', | |||
calculator__pb2.Request.SerializeToString, | |||
calculator__pb2.FloatResponse.FromString, | |||
options, channel_credentials, | |||
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) | |||
@staticmethod | |||
def Divide(request, | |||
target, | |||
options=(), | |||
channel_credentials=None, | |||
call_credentials=None, | |||
insecure=False, | |||
compression=None, | |||
wait_for_ready=None, | |||
timeout=None, | |||
metadata=None): | |||
return grpc.experimental.unary_unary(request, target, '/Calculator/Divide', | |||
calculator__pb2.Request.SerializeToString, | |||
calculator__pb2.FloatResponse.FromString, | |||
options, channel_credentials, | |||
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) |
@@ -0,0 +1,40 @@ | |||
import grpc | |||
import calculator_pb2 as service | |||
import calculator_pb2_grpc as stub | |||
import random | |||
def Add(a, b): | |||
args = service.Request(a=a, b=b) | |||
response = stub.Add(args) | |||
print(f"{a} + {b} = {response.ans}") | |||
def Substract(a, b): | |||
args = service.Request(a=a, b=b) | |||
response = stub.Substract(args) | |||
print(f"{a} - {b} = {response.ans}") | |||
def Multiply(a, b): | |||
args = service.Request(a=a, b=b) | |||
response = stub.Multiply(args) | |||
print(f"{a} * {b} = {response.ans}") | |||
def Divide(a, b): | |||
args = service.Request(a=a, b=b) | |||
response = stub.Divide(args) | |||
print(f"{a} / {b} = {response.ans}") | |||
if __name__ == '__main__': | |||
with grpc.insecure_channel('localhost:1234') as channel: | |||
stub = stub.CalculatorStub(channel) | |||
Add(10, 2) | |||
Substract(10, 2) | |||
Multiply(10, 2) | |||
Divide(10, 2) | |||
Divide(10, 0) |
@@ -0,0 +1,49 @@ | |||
from concurrent.futures import ThreadPoolExecutor | |||
import grpc | |||
import calculator_pb2 as stub | |||
import calculator_pb2_grpc as service | |||
import math | |||
SERVER_ADDR = '0.0.0.0:1234' | |||
class Calculator(service.CalculatorServicer): | |||
def Add(self, request, context): | |||
a = request.a | |||
b = request.b | |||
print(f'Add({a}, {b})') | |||
return stub.FloatResponse(ans=a+b) | |||
def Substract(self, request, context): | |||
a = request.a | |||
b = request.b | |||
print(f'Substract({a}, {b})') | |||
return stub.FloatResponse(ans=a-b) | |||
def Multiply(self, request, context): | |||
a = request.a | |||
b = request.b | |||
print(f'Multiply({a}, {b})') | |||
return stub.FloatResponse(ans=a*b) | |||
def Divide(self, request, context): | |||
a = request.a | |||
b = request.b | |||
print(f'Divide({a}, {b})') | |||
try: | |||
return stub.FloatResponse(ans=a/b) | |||
except Exception as e: | |||
return stub.FloatResponse(ans=math.nan) | |||
if __name__ == '__main__': | |||
try: | |||
server = grpc.server(ThreadPoolExecutor(max_workers=30)) | |||
service.add_CalculatorServicer_to_server(Calculator(), server) | |||
server.add_insecure_port(SERVER_ADDR) | |||
server.start() | |||
print(f'listening on {SERVER_ADDR}') | |||
server.wait_for_termination() | |||
except KeyboardInterrupt: | |||
print('Interrupted by user. Shutting down...') |
@@ -0,0 +1,63 @@ | |||
import argparse | |||
import math | |||
import os | |||
import socket | |||
MSS = 20476 # MSS = Server buffer size (20480) - data header size (4) | |||
def await_ack(packet): | |||
s.settimeout(1) | |||
while True: | |||
try: | |||
data, addr = s.recvfrom(1024) | |||
print(f"Server: {data.decode()}") | |||
received_ack_seqno = int(data[2:3].decode()) | |||
expected_ack_seqno = (int(packet[2:3].decode()) + 1) % 2 | |||
if received_ack_seqno != expected_ack_seqno: | |||
continue | |||
return True | |||
except KeyboardInterrupt: | |||
print("Client: Exiting...") | |||
exit() | |||
except socket.timeout: # Expected ACK was not received within one second | |||
print(f"Client: Retransmitting...") | |||
s.sendto(packet, (server_ip, server_port)) | |||
if __name__ == "__main__": | |||
parser = argparse.ArgumentParser() | |||
parser.add_argument("server_addr", type=str) | |||
parser.add_argument("file_path", type=str) | |||
args = parser.parse_args() | |||
server_ip, server_port = args.server_addr.split(":") | |||
server_port = int(server_port) | |||
file_name = args.file_path.split(os.path.sep)[-1] | |||
file_size = os.path.getsize(args.file_path) | |||
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: | |||
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 30000) | |||
# Check file existence | |||
if not os.path.exists(args.file_path): | |||
print(f"Client: no such file: {args.file_path}") | |||
exit() | |||
# Send start packet to server | |||
packet = f"s|0|{file_name}|{file_size}".encode() | |||
print(f"Client: {packet.decode()}") | |||
s.sendto(packet, (server_ip, server_port)) | |||
# Wait for ACK for the given packet | |||
await_ack(packet) | |||
# Upload file to server | |||
seqno = 1 | |||
with open(args.file_path, "rb") as f: | |||
for i in range(math.ceil(file_size / MSS)): | |||
print(f"Client: d|{seqno}|chunk{i + 1}") | |||
packet = bytes(f"d|{seqno}|", "utf-8") + f.read(MSS) | |||
s.sendto(packet, (server_ip, server_port)) | |||
await_ack(packet) | |||
seqno = (seqno + 1) % 2 |
@@ -0,0 +1 @@ | |||
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. |
@@ -0,0 +1 @@ | |||
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. |
@@ -0,0 +1,75 @@ | |||
import argparse | |||
import math | |||
import os | |||
import socket | |||
IP_ADDR = "0.0.0.0" | |||
MSS = 24000 # MSS = Server buffer size (20480) - data header size (4) | |||
def await_ack(packet, addr): | |||
s.settimeout(1) | |||
while True: | |||
try: | |||
data, addr = s.recvfrom(MSS) | |||
return (data, addr) | |||
except KeyboardInterrupt: | |||
print("Server: Exiting...") | |||
exit() | |||
except socket.timeout: | |||
print("Server: Retransmitting...") | |||
s.sendto(packet, addr) | |||
if __name__ == "__main__": | |||
parser = argparse.ArgumentParser() | |||
parser.add_argument("port", type=str) | |||
args = parser.parse_args() | |||
server_port = args.port | |||
server_port = int(server_port) | |||
with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as s: | |||
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 30000) | |||
s.bind((IP_ADDR, server_port)) | |||
print(f'Listening on {IP_ADDR}:{server_port}...') | |||
data, addr = s.recvfrom(MSS) | |||
columns = data.decode('utf-8', 'replace').split('|') | |||
print(columns) | |||
if os.path.isfile(f'./{columns[2]}'): | |||
print(f'file {columns[2]} exists. Overwriting...') | |||
f = open(columns[2], 'wb') | |||
if columns[0] == 's': | |||
message = f'a|{(int(columns[1])+1)%2}' | |||
s.sendto(message.encode(), addr) | |||
data, addr = await_ack(message.encode(), addr) | |||
bytes_received = 0 | |||
last = b'0' | |||
while bytes_received < int(columns[3]): | |||
new_columns = data.split(b'|') | |||
if new_columns[0] != b'd': | |||
data, addr = await_ack(message.encode(), addr) | |||
continue | |||
if new_columns[1] == last: | |||
data, addr = await_ack(message.encode(), addr) | |||
continue | |||
last = new_columns[1] | |||
bindata = new_columns[2] | |||
for i in new_columns[3:]: | |||
bindata = bindata + (b'|') + i | |||
f.write(bindata) | |||
bytes_received += len(bindata) | |||
print(f'Received: {bytes_received}/{columns[3]}') | |||
message = f'a|{(int(new_columns[1])+1)%2}' | |||
s.sendto(message.encode(), addr) | |||
data, addr = await_ack(message.encode(), addr) | |||
print(f'file {columns[2]} was successfully uploaded. Exiting...') | |||
else: | |||
exit(1) |
@@ -0,0 +1,2 @@ | |||
frames | |||
AmirlanSharipov.gif |
@@ -0,0 +1,81 @@ | |||
# BEFORE CLIENT OPT: | |||
# Frames download time: 4.928671836853027 | |||
# GIF creation time: 5.02190637588501 | |||
# AFTER CLIENT OPT: | |||
# Frames download time: 3.885207176208496 | |||
# GIF creation time: 4.356576204299927 | |||
import os | |||
import socket | |||
import time | |||
import threading | |||
import multiprocessing | |||
from PIL import Image | |||
SERVER_URL = '127.0.0.1:1234' | |||
FILE_NAME = 'AmirlanSharipov.gif' | |||
CLIENT_BUFFER = 1024 | |||
FRAME_COUNT = 5000 | |||
MAXTHREADS = 8 | |||
MAXPROCESSES = 8 | |||
pool_sema = threading.BoundedSemaphore(value=MAXTHREADS) | |||
def routine_save_image(i): | |||
with pool_sema: | |||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |||
ip, port = SERVER_URL.split(':') | |||
s.connect((ip, int(port))) | |||
image = b'' | |||
while True: | |||
packet = s.recv(CLIENT_BUFFER) | |||
if not packet: | |||
break | |||
image += packet | |||
with open(f'frames/{i}.png', 'wb') as f: | |||
f.write(image) | |||
def download_frames(): | |||
t0 = time.time() | |||
if not os.path.exists('frames'): | |||
os.mkdir('frames') | |||
threads = list() | |||
for i in range(FRAME_COUNT): | |||
t = threading.Thread(target=routine_save_image, args=(i, )) | |||
threads.append(t) | |||
for t in threads: | |||
t.start() | |||
for t in threads: | |||
t.join() | |||
return time.time() - t0 | |||
def get_RGBA(fname): | |||
return Image.open(fname).convert('RGBA') | |||
def create_gif(): | |||
t0 = time.time() | |||
frame_list = list() | |||
for frame_id in range(FRAME_COUNT): | |||
frame_list.append(f'frames/{frame_id}.png') | |||
with multiprocessing.Pool(MAXPROCESSES) as p: | |||
frames = p.map(get_RGBA, frame_list) | |||
frames[0].save(FILE_NAME, format="GIF", | |||
append_images=frames[1:], save_all=True, duration=500, loop=0) | |||
return time.time() - t0 | |||
if __name__ == '__main__': | |||
print(f"Frames download time: {download_frames()}") | |||
print(f"GIF creation time: {create_gif()}") |
@@ -0,0 +1,58 @@ | |||
import os | |||
import socket | |||
import time | |||
import random | |||
import threading | |||
import io | |||
from PIL import Image | |||
SERVER_URL = '0.0.0.0:1234' | |||
FRAME_COUNT = 5000 | |||
BACKLOG = 100 | |||
def routine_send_img(connection): | |||
img = Image.new(mode="RGBA", size=(10, 10)) | |||
pix = img.load() | |||
for i in range(10): | |||
for j in range(10): | |||
r = random.randrange(0, 255) | |||
g = random.randrange(0, 255) | |||
b = random.randrange(0, 255) | |||
pix[i, j] = (r, g, b) | |||
output = io.BytesIO() | |||
img.save(output, format='PNG') | |||
connection.send(output.getvalue()) | |||
connection.close() | |||
def main(): | |||
ip, port = SERVER_URL.split(':') | |||
port = int(port) | |||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |||
s.bind((ip, port)) | |||
s.listen(BACKLOG) | |||
print(f'Listening on port {port}...') | |||
threads = list() | |||
while True: | |||
connection, addr = s.accept() | |||
#print(f'connected from {addr}...') | |||
t = threading.Thread(target=routine_send_img, args=(connection, )) | |||
threads.append(t) | |||
t.start() | |||
print(f'Sending an image to {addr}...') | |||
for t in threads: | |||
t.join() | |||
s.close() | |||
if __name__ == '__main__': | |||
main() |
@@ -0,0 +1,7 @@ | |||
BEFORE CLIENT OPT: | |||
Frames download time: 4.928671836853027 | |||
GIF creation time: 5.02190637588501 | |||
AFTER CLIENT MULTIPROCESSING GIF: | |||
Frames download time: 3.885207176208496 | |||
GIF creation time: 4.356576204299927 |
@@ -0,0 +1,2 @@ | |||
venv | |||
rabbitmq |
@@ -0,0 +1,14 @@ | |||
version: "3.2" | |||
services: | |||
rabbitmq3: | |||
container_name: "rabbitmq" | |||
image: rabbitmq:3.8-management-alpine | |||
environment: | |||
- RABBITMQ_DEFAULT_USER=rabbit | |||
- RABBITMQ_DEFAULT_PASS=1234 | |||
ports: | |||
- '5672:5672' # AMQP protocol port | |||
- '15672:15672' # HTTP management UI | |||
volumes: | |||
- ./rabbitmq/data/:/var/lib/rabbitmq/ | |||
- ./rabbitmq/log/:/var/log/rabbitmq |
@@ -0,0 +1,45 @@ | |||
import json | |||
from pika import BlockingConnection, ConnectionParameters, PlainCredentials | |||
from pika.exchange_type import ExchangeType | |||
RMQ_HOST = 'localhost' | |||
RMQ_USER = 'rabbit' | |||
RMQ_PASS = '1234' | |||
EXCHANGE_NAME = 'amq.topic' | |||
ROUTING_KEY_CURRENT = 'rep.current' | |||
ROUTING_KEY_AVG = 'rep.average' | |||
def main(): | |||
connection = BlockingConnection( | |||
ConnectionParameters( | |||
host=RMQ_HOST, | |||
credentials=PlainCredentials(RMQ_USER, RMQ_PASS) | |||
) | |||
) | |||
try: | |||
channel = connection.channel() | |||
result = channel.queue_declare(queue=ROUTING_KEY_CURRENT) | |||
channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) | |||
result = channel.queue_declare(queue=ROUTING_KEY_AVG) | |||
channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) | |||
while True: | |||
query = input('Enter Query: ') | |||
if query == 'current': | |||
channel.basic_publish(exchange=EXCHANGE_NAME, | |||
routing_key=ROUTING_KEY_CURRENT, | |||
body=query) | |||
if query == 'average': | |||
channel.basic_publish(exchange=EXCHANGE_NAME, | |||
routing_key=ROUTING_KEY_AVG, | |||
body=query) | |||
except KeyboardInterrupt: | |||
connection.close() | |||
print('Interrupted by user. Shutting down...') | |||
if __name__ == '__main__': | |||
main() |
@@ -0,0 +1,40 @@ | |||
import json | |||
from datetime import datetime | |||
from pika import BlockingConnection, ConnectionParameters, PlainCredentials | |||
from pika.exchange_type import ExchangeType | |||
RMQ_HOST = 'localhost' | |||
RMQ_USER = 'rabbit' | |||
RMQ_PASS = '1234' | |||
EXCHANGE_NAME = 'amq.topic' | |||
ROUTING_KEY = 'co2.sensor' | |||
def main(): | |||
connection = BlockingConnection( | |||
ConnectionParameters( | |||
host=RMQ_HOST, | |||
credentials=PlainCredentials(RMQ_USER, RMQ_PASS) | |||
) | |||
) | |||
try: | |||
channel = connection.channel() | |||
result = channel.queue_declare(queue=ROUTING_KEY) | |||
channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) | |||
while True: | |||
co2 = int(input('Enter CO2 level: ')) | |||
message = json.dumps({'time': str(datetime.utcnow()), 'value': co2}) | |||
print(message) | |||
channel.basic_publish(exchange=EXCHANGE_NAME, | |||
routing_key=ROUTING_KEY, | |||
body=message) | |||
connection.close() | |||
except KeyboardInterrupt: | |||
connection.close() | |||
print('Interrupted by user. Shutting down...') | |||
if __name__ == '__main__': | |||
main() |
@@ -0,0 +1,4 @@ | |||
{"time": "2023-04-13 18:39:29.566244", "value": 123} | |||
{"time": "2023-04-13 18:39:30.650069", "value": 500} | |||
{"time": "2023-04-13 18:39:31.322338", "value": 501} | |||
{"time": "2023-04-13 19:09:42.439397", "value": 1234} |
@@ -0,0 +1,53 @@ | |||
import json | |||
from pika import BlockingConnection, ConnectionParameters, PlainCredentials | |||
from pika.exchange_type import ExchangeType | |||
RMQ_HOST = 'localhost' | |||
RMQ_USER = 'rabbit' | |||
RMQ_PASS = '1234' | |||
EXCHANGE_NAME = 'amq.topic' | |||
ROUTING_KEY = 'co2.*' | |||
def callback(channel, method, properties, body): | |||
log_file = open('receiver.log', 'a') | |||
log_file.write(body.decode() + '\n') | |||
message = json.loads(body) | |||
status = 'OK' | |||
if message['value'] > 500: | |||
status = 'WARNING' | |||
print(f"{message['time']}: {status}") | |||
log_file.close() | |||
channel.basic_ack(delivery_tag=method.delivery_tag) | |||
def main(): | |||
connection = BlockingConnection( | |||
ConnectionParameters( | |||
host=RMQ_HOST, | |||
credentials=PlainCredentials(RMQ_USER, RMQ_PASS) | |||
) | |||
) | |||
try: | |||
channel = connection.channel() | |||
result = channel.queue_declare(queue=ROUTING_KEY) | |||
channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) | |||
channel.basic_consume(queue=ROUTING_KEY, | |||
on_message_callback=callback) | |||
print('[*] Waiting for CO2 data. Press CTRL+C to exit') | |||
channel.start_consuming() | |||
connection.close() | |||
except KeyboardInterrupt: | |||
connection.close() | |||
print('Interrupted by user. Shutting down...') | |||
if __name__ == '__main__': | |||
main() |
@@ -0,0 +1,60 @@ | |||
import json | |||
import os | |||
from datetime import datetime | |||
from pika import BlockingConnection, ConnectionParameters, PlainCredentials | |||
from pika.exchange_type import ExchangeType | |||
RMQ_HOST = 'localhost' | |||
RMQ_USER = 'rabbit' | |||
RMQ_PASS = '1234' | |||
EXCHANGE_NAME = 'amq.topic' | |||
ROUTING_KEY = 'rep.*' | |||
def extract_value(dict_line): | |||
return dict_line['value'] | |||
def callback(channel, method, properties, body): | |||
log_file = open('receiver.log', 'r') | |||
lines = log_file.readlines() | |||
dict_lines = list(map(json.loads, lines)) | |||
if body == b'current': | |||
print(f"{datetime.utcnow()}: Latest CO2 level is {dict_lines[-1]['value']}") | |||
else: | |||
values = list(map(extract_value, dict_lines)) | |||
avg = sum(values) / len(values) | |||
print(f"{datetime.utcnow()}: Average CO2 level is {avg}") | |||
log_file.close() | |||
channel.basic_ack(delivery_tag=method.delivery_tag) | |||
def main(): | |||
connection = BlockingConnection( | |||
ConnectionParameters( | |||
host=RMQ_HOST, | |||
credentials=PlainCredentials(RMQ_USER, RMQ_PASS) | |||
) | |||
) | |||
try: | |||
channel = connection.channel() | |||
result = channel.queue_declare(queue=ROUTING_KEY) | |||
channel.queue_bind(exchange=EXCHANGE_NAME, queue=result.method.queue) | |||
channel.basic_consume(queue=ROUTING_KEY, | |||
on_message_callback=callback) | |||
print('[*] Waiting for queries from the control tower. Press CTRL+C to exit') | |||
channel.start_consuming() | |||
connection.close() | |||
except KeyboardInterrupt: | |||
connection.close() | |||
print('Interrupted by user. Shutting down...') | |||
if __name__ == '__main__': | |||
main() |
@@ -0,0 +1 @@ | |||
venv |
@@ -0,0 +1,2 @@ | |||
__pycache__ | |||
db.sqlite |
@@ -0,0 +1,42 @@ | |||
import grpc | |||
import schema_pb2 as service | |||
import schema_pb2_grpc as stub | |||
def put_user(user_id, user_name): | |||
args = service.User(user_id=user_id, user_name=user_name) | |||
response = stub.PutUser(args) | |||
print(f"PutUser({user_id}, '{user_name}') = {response.status}") | |||
def get_users(): | |||
args = service.EmptyMessage() | |||
response = stub.GetUsers(args) | |||
result = {} | |||
for user in response.users: | |||
result[user.user_id] = user.user_name | |||
print(f"GetUsers() = {result}") | |||
def delete_user(user_id): | |||
args = service.User(user_id=user_id) | |||
response = stub.DeleteUser(args) | |||
print(f"DeleteUser({user_id}) = {response.status}") | |||
if __name__ == '__main__': | |||
with grpc.insecure_channel('localhost:1234') as channel: | |||
stub = stub.DatabaseStub(channel) | |||
# Create four users | |||
[put_user(i, f"User{i}") for i in range(1, 5)] | |||
# Update the usename of the second user | |||
put_user(2, "User2_updated") | |||
# Delete the thrid user | |||
delete_user(3) | |||
# Retrieve all users | |||
get_users() |
@@ -0,0 +1,29 @@ | |||
syntax = 'proto3'; | |||
service Database { | |||
rpc GetUsers(EmptyMessage) returns (UsersResponse); | |||
rpc DeleteUser(User) returns (Response); | |||
rpc PutUser(User) returns (Response); | |||
} | |||
message User { | |||
int32 user_id = 1; | |||
string user_name = 2; | |||
} | |||
message UserDelete { | |||
int32 user_id = 1; | |||
} | |||
message EmptyMessage { | |||
} | |||
message Response { | |||
bool status = 1; | |||
} | |||
message UsersResponse { | |||
repeated User users = 1; | |||
} | |||
@@ -0,0 +1,35 @@ | |||
# -*- coding: utf-8 -*- | |||
# Generated by the protocol buffer compiler. DO NOT EDIT! | |||
# source: schema.proto | |||
"""Generated protocol buffer code.""" | |||
from google.protobuf.internal import builder as _builder | |||
from google.protobuf import descriptor as _descriptor | |||
from google.protobuf import descriptor_pool as _descriptor_pool | |||
from google.protobuf import symbol_database as _symbol_database | |||
# @@protoc_insertion_point(imports) | |||
_sym_db = _symbol_database.Default() | |||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cschema.proto\"*\n\x04User\x12\x0f\n\x07user_id\x18\x01 \x01(\x05\x12\x11\n\tuser_name\x18\x02 \x01(\t\"\x1d\n\nUserDelete\x12\x0f\n\x07user_id\x18\x01 \x01(\x05\"\x0e\n\x0c\x45mptyMessage\"\x1a\n\x08Response\x12\x0e\n\x06status\x18\x01 \x01(\x08\"%\n\rUsersResponse\x12\x14\n\x05users\x18\x01 \x03(\x0b\x32\x05.User2r\n\x08\x44\x61tabase\x12)\n\x08GetUsers\x12\r.EmptyMessage\x1a\x0e.UsersResponse\x12\x1e\n\nDeleteUser\x12\x05.User\x1a\t.Response\x12\x1b\n\x07PutUser\x12\x05.User\x1a\t.Responseb\x06proto3') | |||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) | |||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'schema_pb2', globals()) | |||
if _descriptor._USE_C_DESCRIPTORS == False: | |||
DESCRIPTOR._options = None | |||
_USER._serialized_start=16 | |||
_USER._serialized_end=58 | |||
_USERDELETE._serialized_start=60 | |||
_USERDELETE._serialized_end=89 | |||
_EMPTYMESSAGE._serialized_start=91 | |||
_EMPTYMESSAGE._serialized_end=105 | |||
_RESPONSE._serialized_start=107 | |||
_RESPONSE._serialized_end=133 | |||
_USERSRESPONSE._serialized_start=135 | |||
_USERSRESPONSE._serialized_end=172 | |||
_DATABASE._serialized_start=174 | |||
_DATABASE._serialized_end=288 | |||
# @@protoc_insertion_point(module_scope) |
@@ -0,0 +1,132 @@ | |||
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! | |||
"""Client and server classes corresponding to protobuf-defined services.""" | |||
import grpc | |||
import schema_pb2 as schema__pb2 | |||
class DatabaseStub(object): | |||
"""Missing associated documentation comment in .proto file.""" | |||
def __init__(self, channel): | |||
"""Constructor. | |||
Args: | |||
channel: A grpc.Channel. | |||
""" | |||
self.GetUsers = channel.unary_unary( | |||
'/Database/GetUsers', | |||
request_serializer=schema__pb2.EmptyMessage.SerializeToString, | |||
response_deserializer=schema__pb2.UsersResponse.FromString, | |||
) | |||
self.DeleteUser = channel.unary_unary( | |||
'/Database/DeleteUser', | |||
request_serializer=schema__pb2.User.SerializeToString, | |||
response_deserializer=schema__pb2.Response.FromString, | |||
) | |||
self.PutUser = channel.unary_unary( | |||
'/Database/PutUser', | |||
request_serializer=schema__pb2.User.SerializeToString, | |||
response_deserializer=schema__pb2.Response.FromString, | |||
) | |||
class DatabaseServicer(object): | |||
"""Missing associated documentation comment in .proto file.""" | |||
def GetUsers(self, request, context): | |||
"""Missing associated documentation comment in .proto file.""" | |||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | |||
context.set_details('Method not implemented!') | |||
raise NotImplementedError('Method not implemented!') | |||
def DeleteUser(self, request, context): | |||
"""Missing associated documentation comment in .proto file.""" | |||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | |||
context.set_details('Method not implemented!') | |||
raise NotImplementedError('Method not implemented!') | |||
def PutUser(self, request, context): | |||
"""Missing associated documentation comment in .proto file.""" | |||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | |||
context.set_details('Method not implemented!') | |||
raise NotImplementedError('Method not implemented!') | |||
def add_DatabaseServicer_to_server(servicer, server): | |||
rpc_method_handlers = { | |||
'GetUsers': grpc.unary_unary_rpc_method_handler( | |||
servicer.GetUsers, | |||
request_deserializer=schema__pb2.EmptyMessage.FromString, | |||
response_serializer=schema__pb2.UsersResponse.SerializeToString, | |||
), | |||
'DeleteUser': grpc.unary_unary_rpc_method_handler( | |||
servicer.DeleteUser, | |||
request_deserializer=schema__pb2.User.FromString, | |||
response_serializer=schema__pb2.Response.SerializeToString, | |||
), | |||
'PutUser': grpc.unary_unary_rpc_method_handler( | |||
servicer.PutUser, | |||
request_deserializer=schema__pb2.User.FromString, | |||
response_serializer=schema__pb2.Response.SerializeToString, | |||
), | |||
} | |||
generic_handler = grpc.method_handlers_generic_handler( | |||
'Database', rpc_method_handlers) | |||
server.add_generic_rpc_handlers((generic_handler,)) | |||
# This class is part of an EXPERIMENTAL API. | |||
class Database(object): | |||
"""Missing associated documentation comment in .proto file.""" | |||
@staticmethod | |||
def GetUsers(request, | |||
target, | |||
options=(), | |||
channel_credentials=None, | |||
call_credentials=None, | |||
insecure=False, | |||
compression=None, | |||
wait_for_ready=None, | |||
timeout=None, | |||
metadata=None): | |||
return grpc.experimental.unary_unary(request, target, '/Database/GetUsers', | |||
schema__pb2.EmptyMessage.SerializeToString, | |||
schema__pb2.UsersResponse.FromString, | |||
options, channel_credentials, | |||
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) | |||
@staticmethod | |||
def DeleteUser(request, | |||
target, | |||
options=(), | |||
channel_credentials=None, | |||
call_credentials=None, | |||
insecure=False, | |||
compression=None, | |||
wait_for_ready=None, | |||
timeout=None, | |||
metadata=None): | |||
return grpc.experimental.unary_unary(request, target, '/Database/DeleteUser', | |||
schema__pb2.User.SerializeToString, | |||
schema__pb2.Response.FromString, | |||
options, channel_credentials, | |||
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) | |||
@staticmethod | |||
def PutUser(request, | |||
target, | |||
options=(), | |||
channel_credentials=None, | |||
call_credentials=None, | |||
insecure=False, | |||
compression=None, | |||
wait_for_ready=None, | |||
timeout=None, | |||
metadata=None): | |||
return grpc.experimental.unary_unary(request, target, '/Database/PutUser', | |||
schema__pb2.User.SerializeToString, | |||
schema__pb2.Response.FromString, | |||
options, channel_credentials, | |||
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) |
@@ -0,0 +1,78 @@ | |||
import sqlite3 | |||
from concurrent.futures import ThreadPoolExecutor | |||
import grpc | |||
import schema_pb2 as stub | |||
import schema_pb2_grpc as service | |||
SERVER_ADDR = '0.0.0.0:1234' | |||
class Database(service.DatabaseServicer): | |||
def PutUser(self, request, context): | |||
try: | |||
con = sqlite3.connect('db.sqlite') | |||
cur = con.cursor() | |||
print(request.user_id, request.user_name) | |||
cur.execute("INSERT OR REPLACE INTO User (id, name) VALUES (?, ?)", | |||
(request.user_id, request.user_name)) | |||
con.commit() | |||
con.close() | |||
return stub.Response(status=1) | |||
except Exception as inst: | |||
con.close() | |||
return stub.Response(status=0) | |||
def GetUsers(self, request, context): | |||
try: | |||
con = sqlite3.connect('db.sqlite') | |||
cur = con.cursor() | |||
res = cur.execute('SELECT id, name FROM User') | |||
users = [] | |||
for user in res.fetchall(): | |||
users.append({'user_id': user[0], 'user_name': user[1]}) | |||
con.commit() | |||
con.close() | |||
return stub.UsersResponse(users=users) | |||
except Exception: | |||
con.close() | |||
return stub.UsersResponse(users=[]) | |||
def DeleteUser(self, request, context): | |||
try: | |||
con = sqlite3.connect('db.sqlite') | |||
cur = con.cursor() | |||
cur.execute('DELETE FROM User WHERE id=?', (request.user_id, )) | |||
con.commit() | |||
con.close() | |||
return stub.Response(status=1) | |||
except Exception: | |||
con.close() | |||
return stub.Response(status=0) | |||
def create_table(): | |||
try: | |||
con = sqlite3.connect('db.sqlite') | |||
cur = con.cursor() | |||
cur.execute('CREATE TABLE IF NOT EXISTS User(id INTEGER PRIMARY KEY, name TEXT)') | |||
con.commit() | |||
con.close() | |||
print('Created a table') | |||
except Exception: | |||
print('DOESNT WORK') | |||
if __name__ == '__main__': | |||
create_table() | |||
try: | |||
server = grpc.server(ThreadPoolExecutor(max_workers=30)) | |||
service.add_DatabaseServicer_to_server(Database(), server) | |||
server.add_insecure_port(SERVER_ADDR) | |||
server.start() | |||
print(f'listening on {SERVER_ADDR}') | |||
server.wait_for_termination() | |||
except KeyboardInterrupt: | |||
print('Interrupted by user. Shutting down...') |
@@ -0,0 +1,7 @@ | |||
FROM python:alpine | |||
WORKDIR /app | |||
ENV PYTHONUNBUFFERED=1 | |||
COPY *.py ./ |
@@ -0,0 +1,29 @@ | |||
from xmlrpc.client import ServerProxy | |||
import random | |||
M = 5 | |||
PORT = 1234 | |||
RING = [2, 7, 11, 17, 22, 27] | |||
def lookup(node_id: int, key: int) -> None: | |||
"""Calls the get method for a remote node over RPC and print the result""" | |||
with ServerProxy(f'http://node_{node_id}:{PORT}') as node: | |||
print(f"lookup({node_id}, {key}) = {node.get(key)}") | |||
if __name__ == '__main__': | |||
print("Client started") | |||
# Asking a random node to insert an entry into the DHT | |||
# String keys should be consistently hashed to an integer value between 0 and (2**M)-1 | |||
for i in range(2 ** M): | |||
with ServerProxy(f'http://node_{random.choice(RING)}:{PORT}') as node: | |||
node.put(i, f"value_{i}") | |||
# Querying a DHT node for the value of a certain key. | |||
lookup(2, 20) | |||
lookup(11, 15) | |||
lookup(17, 1) | |||
lookup(22, 27) | |||
lookup(2, 5) |
@@ -0,0 +1,45 @@ | |||
version: '3' | |||
services: | |||
client: | |||
image: chord | |||
build: . | |||
container_name: client | |||
command: python3 client.py | |||
depends_on: [ node_2, node_7, node_11, node_17, node_22, node_27 ] | |||
node_2: | |||
image: chord | |||
build: . | |||
container_name: node_2 | |||
command: sh -c 'python3 node*.py 2' | |||
node_7: | |||
image: chord | |||
build: . | |||
container_name: node_7 | |||
command: sh -c 'python3 node*.py 7' | |||
node_11: | |||
image: chord | |||
build: . | |||
container_name: node_11 | |||
command: sh -c 'python3 node*.py 11' | |||
node_17: | |||
image: chord | |||
build: . | |||
container_name: node_17 | |||
command: sh -c 'python3 node*.py 17' | |||
node_22: | |||
image: chord | |||
build: . | |||
container_name: node_22 | |||
command: sh -c 'python3 node*.py 22' | |||
node_27: | |||
image: chord | |||
build: . | |||
container_name: node_27 | |||
command: sh -c 'python3 node*.py 27' |
@@ -0,0 +1,142 @@ | |||
from argparse import ArgumentParser | |||
from bisect import bisect_left, bisect_right | |||
from threading import Thread | |||
from xmlrpc.client import ServerProxy | |||
from xmlrpc.server import SimpleXMLRPCServer | |||
import traceback | |||
M = 5 | |||
PORT = 1234 | |||
RING = [2, 7, 11, 17, 22, 27] | |||
class Node: | |||
def __init__(self, node_id): | |||
"""Initializes the node properties and constructs the finger table according to the Chord formula""" | |||
# Assuming that the program knows all the nodes and stores them in a sorted array RING | |||
self.node_id = node_id | |||
self.finger_table = [] | |||
self.successor_id = RING[(RING.index(node_id) + 1) % len(RING)] | |||
self.predecessor_id = RING[RING.index(node_id) - 1] | |||
self.table = {} | |||
for i in range(M): | |||
self.finger_table.append(RING[bisect_left(RING, ((node_id + (2 ** i)) % (2 ** M))) % len(RING)]) | |||
print(f"Node created! Finger table = {self.finger_table}, [pred, succ] = [{self.predecessor_id}, {self.successor_id}]") | |||
def closest_preceding_node(self, id): | |||
"""Returns node_id of the closest preceeding node (from n.finger_table) for a given id""" | |||
for i in reversed(self.finger_table): | |||
if i == RING[-1]: | |||
idx = bisect_left([RING[0], RING[-1]], id) | |||
if idx == 0 or idx == 2: | |||
return i | |||
elif self.node_id > i: | |||
idx = bisect_left([i, self.node_id], id) | |||
if idx == 1: | |||
return i | |||
else: | |||
if i > self.node_id and i < id: | |||
return i | |||
return self.finger_table[-1] | |||
def find_successor(self, id): | |||
"""Recursive function returning the identifier of the node responsible for a given id""" | |||
if id == self.node_id: | |||
return id | |||
# Note the half-open interval and that L <= R does not necessarily hold | |||
if self.successor_id < self.node_id: | |||
idx = bisect_left([self.successor_id, self.node_id], id) | |||
if idx == 0 or idx == 2: | |||
return self.successor_id | |||
elif id in range(self.node_id, self.successor_id + 1): | |||
return self.successor_id | |||
# Forward the query to the closest preceding node in the finger table for n | |||
n0 = self.closest_preceding_node(id) | |||
print(f'Forwarding request to node {n0}') | |||
with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: | |||
return proxy.find_successor(id) | |||
def put(self, key, value): | |||
"""Stores the given key-value pair in the node responsible for it""" | |||
try: | |||
print(f"put({key}, {value})") | |||
if self.node_id < self.predecessor_id: | |||
idx = bisect_left([self.node_id, self.predecessor_id], key) | |||
if idx == 0 or idx == 2: | |||
return self.store_item(key, value) | |||
elif key in range(self.predecessor_id, self.node_id + 1): | |||
return self.store_item(key, value) | |||
n0 = self.find_successor(key) | |||
if self.node_id == n0: | |||
return self.store_item(key, value) | |||
with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: | |||
return proxy.store_item(key, value) | |||
except Exception as e: | |||
print(f"couldn't put({key}, {value})") | |||
print(traceback.format_exc()) | |||
print(e) | |||
return False | |||
def get(self, key): | |||
"""Gets the value for a given key from the node responsible for it""" | |||
try: | |||
print(f"get({key})") | |||
if self.node_id < self.predecessor_id: | |||
idx = bisect_left([self.node_id, self.predecessor_id], key) | |||
if idx == 0 or idx == 2: | |||
return self.retrieve_item(key) | |||
elif key in range(self.predecessor_id, self.node_id + 1): | |||
return self.retrieve_item(key) | |||
n0 = self.find_successor(key) | |||
if self.node_id == n0: | |||
return self.retrieve_item(key) | |||
with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: | |||
return proxy.retrieve_item(key) | |||
except Exception as e: | |||
print(f"couldn't get({key})") | |||
print(traceback.format_exc()) | |||
print(e) | |||
return -1 | |||
def store_item(self, key, value): | |||
"""Stores a key-value pair into the data store of this node""" | |||
self.table[key] = value | |||
return True | |||
def retrieve_item(self, key): | |||
"""Retrieves a value for a given key from the data store of this node""" | |||
if key in self.table: | |||
return self.table[key] | |||
return -1 | |||
if __name__ == '__main__': | |||
try: | |||
parser = ArgumentParser() | |||
parser.add_argument('node_id') | |||
args = parser.parse_args() | |||
node = Node(int(args.node_id)) | |||
server = SimpleXMLRPCServer(('0.0.0.0', PORT)) | |||
print("Listening on port 1234...") | |||
server.register_function(node.get, "get") | |||
server.register_function(node.put, "put") | |||
server.register_function(node.retrieve_item, "retrieve_item") | |||
server.register_function(node.store_item, "store_item") | |||
server.register_function(node.find_successor, "find_successor") | |||
server.serve_forever() | |||
except KeyboardInterrupt: | |||
print("node killed...") | |||
exit() | |||
@@ -0,0 +1,7 @@ | |||
FROM python:alpine | |||
WORKDIR /app | |||
ENV PYTHONUNBUFFERED=1 | |||
COPY *.py ./ |
@@ -0,0 +1,22 @@ | |||
from xmlrpc.client import ServerProxy | |||
import socket | |||
import time | |||
PORT = 1234 | |||
CLUSTER = [1, 2, 3] | |||
LOGS = ['SET 5', 'ADD 1'] | |||
if __name__ == '__main__': | |||
time.sleep(10) # Wait for leader election processs | |||
print('Client started') | |||
for node_id in CLUSTER: | |||
try: | |||
with ServerProxy(f'http://node_{node_id}:{PORT}') as node: | |||
if node.is_leader(): | |||
print(f"Node {node_id} is the cluster leader. Sending logs") | |||
for log in LOGS: | |||
if node.leader_receive_log(log): | |||
print(f"Leader committed '{log}'") | |||
time.sleep(5) # Wait for entries to propagate | |||
except socket.error as e: | |||
print(f"Failed to connect to node_{node_id}: {e}") |
@@ -0,0 +1,27 @@ | |||
version: '3' | |||
services: | |||
client: | |||
build: . | |||
image: raft | |||
container_name: client | |||
depends_on: [ node_1, node_2, node_3 ] | |||
command: python3 client.py | |||
node_1: | |||
build: . | |||
image: raft | |||
container_name: node_1 | |||
command: sh -c 'python3 node*.py 1' | |||
node_2: | |||
build: . | |||
image: raft | |||
container_name: node_2 | |||
command: sh -c 'python3 node*.py 2' | |||
node_3: | |||
build: . | |||
image: raft | |||
container_name: node_3 | |||
command: sh -c 'python3 node*.py 3' |
@@ -0,0 +1,205 @@ | |||
import random | |||
import sched | |||
import socket | |||
import time | |||
from threading import Thread | |||
from argparse import ArgumentParser | |||
from enum import Enum | |||
from xmlrpc.client import ServerProxy | |||
from xmlrpc.server import SimpleXMLRPCServer | |||
PORT = 1234 | |||
CLUSTER = [1, 2, 3] | |||
ELECTION_TIMEOUT = (6, 8) | |||
HEARTBEAT_INTERVAL = 5 | |||
class NodeState(Enum): | |||
"""Enumerates the three possible node states (follower, candidate, or leader)""" | |||
FOLLOWER = 1 | |||
CANDIDATE = 2 | |||
LEADER = 3 | |||
class Node: | |||
def __init__(self, node_id): | |||
"""Non-blocking procedure to initialize all node parameters and start the first election timer""" | |||
self.node_id = node_id | |||
self.state = NodeState.FOLLOWER | |||
self.term = 0 | |||
self.votes = {} | |||
self.log = [] | |||
self.pending_entry = '' | |||
self.sched = sched.scheduler() | |||
self.event = '' | |||
# TODO: start election timer for this node | |||
self.reset_election_timer() | |||
print(f"Node started! State: {self.state}. Term: {self.term}") | |||
def is_leader(self): | |||
"""Returns True if this node is the elected cluster leader and False otherwise""" | |||
if self.state == NodeState.LEADER: | |||
return True | |||
return False | |||
def reset_election_timer(self): | |||
"""Resets election timer for this (follower or candidate) node and returns it to the follower state""" | |||
self.state = NodeState.FOLLOWER | |||
q = self.sched.queue | |||
for event in q: | |||
self.sched.cancel(event) | |||
#if (self.node_id == 1 or self.node_id == 3): | |||
# self.sched.enter(0, 1, self.hold_election, ()) | |||
# return | |||
self.sched.enter(random.uniform(ELECTION_TIMEOUT[0], ELECTION_TIMEOUT[1]), 1, self.hold_election, ()) | |||
def reset_heartbeat_timer(self): | |||
q = self.sched.queue | |||
for event in q: | |||
self.sched.cancel(event) | |||
self.sched.enter(HEARTBEAT_INTERVAL, 1, self.append_entries, ()) | |||
def hold_election(self): | |||
"""Called when this follower node is done waiting for a message from a leader (election timeout) | |||
The node increments term number, becomes a candidate and votes for itself. | |||
Then call request_vote over RPC for all other online nodes and collects their votes. | |||
If the node gets the majority of votes, it becomes a leader and starts the hearbeat timer | |||
If the node loses the election, it returns to the follower state and resets election timer. | |||
""" | |||
self.term = self.term + 1 | |||
self.state = NodeState.CANDIDATE | |||
self.votes = {} | |||
self.votes[self.node_id] = True | |||
print(f'New election term {self.term}. State: {self.state}') | |||
for n0 in CLUSTER: | |||
if node == self.node_id: | |||
continue | |||
try: | |||
print(f'Requesting vote from node {n0}') | |||
with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: | |||
if proxy.request_vote(self.term, self.node_id): | |||
self.votes[n0] = True | |||
else: | |||
self.votes[n0] = False | |||
except Exception as e: | |||
print(f"couldn't request_vote from {n0}") | |||
print(traceback.format_exc()) | |||
print(e) | |||
if sum(self.votes.values()) > len(CLUSTER) / 2: | |||
self.state = NodeState.LEADER | |||
self.reset_heartbeat_timer() | |||
print(f"New election term {self.term}. State: {self.state}") | |||
def request_vote(self, term, candidate_id): | |||
"""Called remotely when a node requests voting from other nodes. | |||
Updates the term number if the received one is greater than `self.term` | |||
A node rejects the vote request if it's a leader or it already voted in this term. | |||
Returns True and update `self.votes` if the vote is granted to the requester candidate and False otherwise. | |||
""" | |||
print(f"Got a vote request from {candidate_id} (term={term})") | |||
self.reset_election_timer() | |||
if term > self.term: | |||
self.term = term | |||
self.votes = {} | |||
if self.is_leader() or len(self.votes) > 0: | |||
return False | |||
self.votes[candidate_id] = True | |||
return True | |||
def append_entries(self): | |||
"""Called by leader every HEARTBEAT_INTERVAL, sends a heartbeat message over RPC to all online followers. | |||
Accumulates ACKs from followers for a pending log entry (if any) | |||
If the majority of followers ACKed the entry, the entry is committed to the log and is no longer pending | |||
""" | |||
print("Sending a heartbeat to followers") | |||
acks = 0 | |||
for n0 in CLUSTER: | |||
if n0 == self.node_id: | |||
continue | |||
try: | |||
with ServerProxy(f'http://node_{n0}:{PORT}') as proxy: | |||
if proxy.heartbeat(self.pending_entry): | |||
acks = acks + 1 | |||
except Exception as e: | |||
print(f"couldn't heartbeat {n0}") | |||
print(traceback.format_exc()) | |||
print(e) | |||
if self.pending_entry != '' and acks > len(CLUSTER) / 2: | |||
self.log.append(self.pending_entry) | |||
print(f'Leader commited \'{self.pending_entry}\'') | |||
self.pending_entry = '' | |||
self.reset_heartbeat_timer() | |||
def heartbeat(self, leader_entry): | |||
"""Called remotely from the leader to inform followers that it's alive and supply any pending log entry | |||
Followers would commit an entry if it was pending before, but is no longer now. | |||
Returns True to ACK the heartbeat and False on any problems. | |||
""" | |||
print(f"Heartbeat received from leader (entry='{leader_entry}')") | |||
try: | |||
self.reset_election_timer() | |||
if self.pending_entry != '' and leader_entry != self.pending_entry: | |||
self.log.append(self.pending_entry) | |||
print(f'Follower commited \'{self.pending_entry}\'') | |||
self.pending_entry = leader_entry | |||
return True | |||
except Exception as e: | |||
return False | |||
def leader_receive_log(self, log): | |||
"""Called remotely from the client. Executed only by the leader upon receiving a new log entry | |||
Returns True after the entry is committed to the leader log and False on any problems | |||
""" | |||
print(f"Leader received log \'{log}\' from client") | |||
while self.pending_entry != '': | |||
time.sleep(1) | |||
self.pending_entry = log | |||
time.sleep(7) | |||
if self.pending_entry == '' and self.log[-1] == log: | |||
return True | |||
return False | |||
if __name__ == '__main__': | |||
# TODO: Parse one integer argument (node_id), then create the node with that ID. | |||
# TODO: Start RPC server on 0.0.0.0:PORT and expose the node instance | |||
# TODO: Run the node scheduler in an isolated thread. | |||
# TODO: Handle KeyboardInterrupt and terminate gracefully. | |||
try: | |||
parser = ArgumentParser() | |||
parser.add_argument('node_id') | |||
args = parser.parse_args() | |||
node = Node(int(args.node_id)) | |||
t = Thread(target=node.sched.run) | |||
t.start() | |||
server = SimpleXMLRPCServer(('0.0.0.0', PORT), logRequests=False) | |||
print(f"Listening on port {PORT}...") | |||
server.register_function(node.leader_receive_log, "leader_receive_log") | |||
server.register_function(node.heartbeat, "heartbeat") | |||
server.register_function(node.request_vote, "request_vote") | |||
server.register_function(node.is_leader, "is_leader") | |||
server.serve_forever() | |||
except KeyboardInterrupt: | |||
print("node killed...") | |||
exit() | |||