远程过程调用训练 晓风の个人博客

问题描述

远程过程调用(RPC)将网络编程变得非常简单。根据所学的RPC相关原理,实现客户端-服务器通信,并进行简单的计算如数据库查询、算术计算、数据挖掘、深度学习推导等。

要求

  1. 采用gRPC
  2. 采用Protobuf作为C-S数据传输格式
  3. 服务器端采用线程池,支持并发
  4. 支持至少两种的计算服务如简单的算术运算+数据挖掘算法(K-means、KNN等)
  5. 编程语言不做要求

建议

gRPC和Protobuf在配置环境时可能有些复杂, 对于有些编程语言如C++、go等会有一些挑战, Python问题会少一些。

解决方案

人生苦短,选择Python。

开发环境

硬件

所用机器型号为VAIO Z Flip 2016

  • Intel(R) Core(TM) i7-6567U CPU @3.30GHZ 3.31GHz
  • 8.00GB RAM

软件

  • Windows 10, 64-bit (Build 17763) 10.0.17763
  • Visual Studio Code 1.39.2
    • Python 2019.10.41019:九月底发布的VSCode Python插件支持在编辑器窗口内原生运行juyter nootbook了,非常赞!
    • Remote - WSL 0.39.9:配合WSL,在Windows上获得Linux接近原生环境的体验。
  • Windows Subsystem for Linux [Ubuntu 18.04.2 LTS]:WSL是以软件的形式运行在Windows下的 Linux子系统,是近些年微软推出来的新工具,可以在Windows系统上原生运行Linux。
    • Python 3.7.4 64-bit (‘anaconda3’:virtualenv):安装在WSL中。
安装gRPCpython环境
python -m pip install grpcio grpcio-tools

源代码

这里实现了简单的求阶乘运算和基于sklearn实现的KNN算法。

wuk_hw3.proto

syntax = "proto3";
service Greeter {
    rpc KNN(KNNargs) returns (Vector) {}
    rpc CalculateFactorial(Int32) returns (Int32) {}
}
message Vector {
    repeated float value = 1;
}
message Matrix {
    repeated Vector value=1;
}
message KNNargs {
    Vector train_y = 1;
    Matrix train_x = 2;
    Matrix test_x = 3;
}
message Int32 {
    int32 value = 1;
}

wuk_hw3_server.py

from sklearn.neighbors import KNeighborsClassifier
import numpy as np
from concurrent import futures
import logging
import grpc
import wuk_hw3_pb2
import wuk_hw3_pb2_grpc


class Greeter(wuk_hw3_pb2_grpc.GreeterServicer):
    def KNN(self, request, context):
        testx, trainx, trainy = [], [], []
        for row in request.test_x.value:
            tmp = []
            for it in row.value:
                tmp.append(it)
            testx.append(tmp)
        for row in request.train_x.value:
            tmp = []
            for it in row.value:
                tmp.append(it)
            trainx.append(tmp)
        for it in request.train_y.value:
            trainy.append(it)
        knn = KNeighborsClassifier()
        knn.fit(np.array(trainx), np.array(trainy))
        return wuk_hw3_pb2.Vector(value=knn.predict(np.array(testx)).tolist())

    def CalculateFactorial(self, request, context):
        ans = 1
        for i in range(request.value):
            ans = ans*(i+1)
        return wuk_hw3_pb2.Int32(value=ans)


if __name__ == '__main__':
    logging.basicConfig()
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    wuk_hw3_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

wuk_hw3_client.py

from __future__ import print_function
from sklearn.model_selection import train_test_split
from sklearn import datasets
import logging
import grpc
import wuk_hw3_pb2
import wuk_hw3_pb2_grpc

if __name__ == '__main__':
    logging.basicConfig()
    stub = wuk_hw3_pb2_grpc.GreeterStub(
        grpc.insecure_channel('localhost:50051'))
    iris = datasets.load_iris()
    X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target)
    X_train, Y_train, trainx, testx = X_train.tolist(), Y_train.tolist(), [], []
    for row in X_train:
        trainx.append(wuk_hw3_pb2.Vector(value=row))
    for row in X_test:
        testx.append(wuk_hw3_pb2.Vector(value=row))
    print("using knn to test iris\'dataset: ", X_test.tolist())
    res = stub.KNN(wuk_hw3_pb2.KNNargs(
        train_x=wuk_hw3_pb2.Matrix(value=trainx), train_y=wuk_hw3_pb2.Vector(value=Y_train),
        test_x=wuk_hw3_pb2.Matrix(value=testx)))
    res_list = []
    for ele in res.value:
        res_list.append(ele)
    print("result: ", res_list)
    fac = stub.CalculateFactorial(wuk_hw3_pb2.Int32(value=9))
    print("9! = ", str(fac.value))

实验结果

编译Protobuf

运行下述代码之后,目录下生成wuk_hw3_pb2.pywuk_hw3_pb2_grpc.py两个文件。

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. wuk_hw3.proto

服务端

运行下述代码,启动服务器进程。

python wuk_hw3_server.py

客户端

新开终端运行下述代码,启动客户端和服务器进行通信。可以看到,成功输出了在sklearn的样例数据集上跑KNN算法的结果和九的阶乘。

$ python wuk_hw3_client.py
using knn to test iris'dataset:  [[7.1, 3.0, 5.9, 2.1], [5.7, 3.0, 4.2, 1.2], [5.9, 3.0, 5.1, 1.8], [5.7, 2.8, 4.5, 1.3], [7.7, 3.0, 6.1, 2.3], [6.5, 3.2, 5.1, 2.0], [5.2, 3.4, 1.4, 0.2], [5.6, 2.9, 3.6, 1.3], [4.6, 3.1, 1.5, 0.2], [5.5, 3.5, 1.3, 0.2], [6.0, 3.4, 4.5, 1.6], [6.8, 3.0, 5.5, 2.1], [7.9, 3.8, 6.4, 2.0], [6.1, 3.0, 4.9, 1.8], [6.8, 3.2, 5.9, 2.3], [6.0, 3.0, 4.8, 1.8], [6.2, 2.8, 4.8, 1.8], [6.3, 2.5, 5.0, 1.9], [6.7, 3.3, 5.7, 2.1], [6.3, 2.9, 5.6, 1.8], [4.9, 3.1, 1.5, 0.1], [7.7, 3.8, 6.7, 2.2], [6.0, 2.2, 4.0, 1.0], [4.4, 2.9, 1.4, 0.2], [5.8, 2.7, 3.9, 1.2], [4.9, 2.4, 3.3, 1.0], [4.5, 2.3, 1.3, 0.3], [4.9, 3.0, 1.4, 0.2], [5.8, 2.7, 5.1, 1.9], [5.0, 3.4, 1.6, 0.4], [5.7, 2.8, 4.1, 1.3], [6.0, 2.7, 5.1, 1.6], [6.3, 2.8, 5.1, 1.5], [4.8, 3.1, 1.6, 0.2], [5.1, 2.5, 3.0, 1.1], [5.1, 3.8, 1.5, 0.3], [6.6, 2.9, 4.6, 1.3], [6.7, 3.0, 5.2, 2.3]]
result:  [2.0, 1.0, 2.0, 1.0, 2.0, 2.0, 0.0, 1.0, 0.0, 0.0, 1.0, 2.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 2.0, 2.0, 0.0, 2.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 2.0, 1.0, 0.0, 1.0, 0.0, 1.0, 2.0]
9! =  362880

遇到的问题及解决方法

只要代码能跑起来,很多难题慢慢都会得到解决。参考官方文档,写了第一个gRPCHelloWorld示例,随后要求的功能均在此之上一点点增量开发出来。