RPC架构设计及IO模型
本文阐述了socket编程、IO网络模型,以及各种IO模型的适用场景。
RPC架构设计
:::tip 文章更新历史
2022/05/10 feat:新增GO、C++、C支持
2022/05/10 feat:新增PHP支持
2022/05/09 feat:新增Scala、Grovvy、GraalVMJS支持
2022/05/09 feat:新增C#、F#支持
2022/05/09 feat:新增JRuby支持
2022/05/09 feat:新增Kotlin支持、Jython支持
2022/05/04 feat:修改相关描述。
2022/03/01 feat:初稿。
:::
socket
socket网络编程
socket概述
socket套接字是两台主机之间逻辑连接的端点。
TCP/IP协议是传输层协议,主要解决数据在网络中的传输
socket是网络通信之间的抽象接口,它包含网络通信的五种基础信息:连接使用的协议、本地主机的ip地址、本地进程协议端口、远程主机ip地址、远程进程的协议端口。
socket整体流程
socket编程主要包括客户端和服务端两个方面。
首先在服务端创建一个服务端套接字(ServerSocket),并把它附加到一个端口上,服务端从这个端口监听链接。
端口范围是 0-65536
,注意 0-1024
是特权服务保留的端口。可选择任意一个不被其他进程使用的端口。
客户端请求与服务端连接时,根据服务端的域名或者ip地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的操作可以像输入输出流一样操作。
代码实现
服务端代码
/** * 服务端 * * @name: ServerDemo * @author: terwer * @date: 2022-05-08 14:20 */ object ServerDemo { @Throws(IOException::class) @JvmStatic fun main(args: Array<String>) { // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信 val executorService = Executors.newCachedThreadPool() // 2.创建ServerSocket val serverSocket = ServerSocket(9999) println("服务器已启动") while (true) { // 3.监听客户端 val socket = serverSocket.accept() println("有客户端链接") executorService.execute { handle(socket) } } } private fun handle(socket: Socket) { try { println("线程ID:" + Thread.currentThread().id + ",线程名称:" + Thread.currentThread().name) // 从连接中取出输入流 val inputStream = socket.getInputStream() val b = ByteArray(1024) val read = inputStream.read(b) println("客户端发过来的消息:" + String(b, 0, read)) // 链接中取出输出流并回话 val outputStream = socket.getOutputStream() outputStream.write("没有".toByteArray()) } catch (e: Exception) { e.printStackTrace() } finally { try { socket.close() } catch (e: IOException) { e.printStackTrace() } } } }
/** * 服务端 * * @name: ServerDemo * @author: terwer * @date: 2022-04-17 14:20 **/ public class ServerDemo { public static void main(String[] args) throws IOException { // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信 ExecutorService executorService = Executors.newCachedThreadPool(); // 2.创建ServerSocket ServerSocket serverSocket = new ServerSocket(9999); System.out.println("服务器已启动"); while (true) { // 3.监听客户端 Socket socket = serverSocket.accept(); System.out.println("有客户端链接"); executorService.execute(new Runnable() { @Override public void run() { handle(socket); } }); } } private static void handle(Socket socket) { try { System.out.println("线程ID:" + Thread.currentThread().getId() + ",线程名称:" + Thread.currentThread().getName()); // 从连接中取出输入流 InputStream inputStream = socket.getInputStream(); byte[] b = new byte[1024]; int read = inputStream.read(b); System.out.println("客户端" + new String(b, 0, read)); // 链接中取出输出流并回话 OutputStream outputStream = socket.getOutputStream(); outputStream.write("没有".getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
# -*- coding: utf-8 -*- from java.util.concurrent import Executors from java.net import ServerSocket from java.lang import String import jarray import sys # Jython版socket服务端 # # @name: ServerDemo # @author: terwer # @date: 2022-05-09 15:48 class ServerDemo(object): @classmethod def main(self, args): executorService = Executors.newCachedThreadPool() serverSocket = ServerSocket(9999) print("server is running") while True: socket = serverSocket.accept() # lambda <<argument(s)>> : <<function body>> # name_combo = lambda first, last: first + ' ' + last # name_combo('Jim','Baker') runnable_lambda_fun = lambda: self.handle(socket) executorService.execute(runnable_lambda_fun) @classmethod def handle(self, socket): try: print("receiced socket here=>") print(socket) inputStream = socket.getInputStream() b = jarray.zeros(1024, "b") read = inputStream.read(b) in_str = String(b, 0, read) print("receice msg from client=>") print(in_str) outputStream = socket.getOutputStream() out_bytes = map(ord, "msg from jython socket server") outputStream.write(out_bytes) except Exception as e: print("An error occured:") print(e) finally: try: socket.close() except IOError as e: print("An error occured:") print(e) if __name__ == "__main__": ServerDemo.main(sys.argv)
require 'java' java_import 'java.util.concurrent.Executors' java_import 'java.net.ServerSocket' java_import 'java.net.SocketInputStream' java_import 'java.lang.Byte' # JRuby版socket服务端 # # https://github.com/jruby/jruby/wiki/JRuby-Reference#Arrays # # @name: ServerDemo # @author: terwer # @date: 2022-05-09 21:04 class ServerDemo def self.main() executorService = Executors.newCachedThreadPool() serverSocket = ServerSocket.new(9999) puts("jbury server is running...") while (true) socket = serverSocket.accept() runnable_lambda_fun = -> { self.handle(socket) } executorService.execute(runnable_lambda_fun) end end def self.handle(socket) begin print("receiced socket here=>") puts(socket) inputStream = socket.getInputStream() b = Java::byte[1024].new read = inputStream.read(b) puts "received msg from client=> #{b}" outputStream = socket.getOutputStream() out_bytes = "server msg send from jruby".to_java_bytes outputStream.write(out_bytes) rescue => e socket.close() puts "Exception Occurred #{e.class}. Message: #{e.message}. Backtrace: \n #{e.backtrace.join("\n")}" Rails.logger.error "Exception Occurred #{e.class}. Message: #{e.message}. Backtrace: \n #{e.backtrace.join("\n")}" end end end ServerDemo.main()
/** * 服务端 * * @name: ServerDemo * @author: terwer * @date: 2022-05-09 14:20 * */ object ServerDemo { @throws[IOException] def main(args: Array[String]) = { // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信 val executorService = Executors.newCachedThreadPool // 2.创建ServerSocket val serverSocket = new ServerSocket(9999) System.out.println("服务器已启动") while ( { true }) { // 3.监听客户端 val socket = serverSocket.accept System.out.println("有客户端链接") executorService.execute(() => handle(socket)) } } private def handle(socket: Socket) = try { System.out.println("线程ID:" + Thread.currentThread.getId + ",线程名称:" + Thread.currentThread.getName) // 从连接中取出输入流 val inputStream = socket.getInputStream val b = new Array[Byte](1024) val read = inputStream.read(b) System.out.println("客户端发过来的消息:" + new String(b, 0, read)) // 链接中取出输出流并回话 val outputStream = socket.getOutputStream outputStream.write("没有".getBytes) } catch { case e: Exception => e.printStackTrace() } finally try socket.close() catch { case e: IOException => e.printStackTrace() } }
/** * 服务端 * * @name: ServerDemo * @author: terwer * @date: 2022-05-09 14:20 * */ class ServerDemo { static void main(String[] args) throws IOException { // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信 ExecutorService executorService = Executors.newCachedThreadPool() // 2.创建ServerSocket ServerSocket serverSocket = new ServerSocket(9999) System.out.println('服务器已启动') while (true) { // 3.监听客户端 Socket socket = serverSocket.accept() System.out.println('有客户端链接') executorService.execute(() -> handle(socket)) } } private static void handle(Socket socket) { try { System.out.println('线程ID:' + Thread.currentThread().id + ',线程名称:' + Thread.currentThread().name) // 从连接中取出输入流 InputStream inputStream = socket.inputStream byte[] b = new byte[1024] int read = inputStream.read(b) System.out.println('客户端发过来的消息:' + new String(b, 0, read)) // 链接中取出输出流并回话 OutputStream outputStream = socket.outputStream outputStream.write('没有'.bytes) } catch (Exception e) { e.printStackTrace() } finally { try { socket.close() } catch (IOException e) { e.printStackTrace() } } } }
<?php namespace com\terwergreen\php; /** * 服务端 * * @name: ServerDemo * @author: terwer * @date: 2022-05-10 14:20 **/ class ServerDemo { public static function main($args) { $addr = "127.0.0.1"; $port = 9999; // 创建server_socket $server_socket = socket_create(AF_INET, SOCK_STREAM, 0); // 绑定端口 socket_bind($server_socket, $addr, $port); // 监听 socket_listen($server_socket); echo "php版socket服务器启动成功\n"; while (true) { $socket = socket_accept($server_socket); self::handle($socket); } } public static function handle($socket) { echo "处理客户端连接\n"; $msg = socket_read($socket, 1024, PHP_BINARY_READ); echo "来自客户端的消息:$msg"; $out_msg = "这是php版socket服务器发送过来的消息"; socket_write($socket, $out_msg); socket_close($socket); } } $args = []; ServerDemo::main($args);
package main import ( "bytes" "fmt" "net" ) /* * 服务端 * * @name: ServerDemo * @author: terwer * @date: 2022-05-09 23:14 **/ func main() { listen, _ := net.Listen("tcp", ":9999") fmt.Println("go的socket服务器已启动") for true { conn, _ := listen.Accept() handle(conn) } } func handle(conn net.Conn) { fmt.Println("处理客户端连接") // 收消息 b := make([]byte, 1024) read, err := conn.Read(b) if err != nil { return } var dataBuffer bytes.Buffer dataBuffer.Write(b[:read]) fmt.Println("接收到来自客户端的消息:") fmt.Print(dataBuffer.String()) // 发消息 var str string = "这是来着go的socket服务端发过来的消息" var outBytes []byte = []byte(str) conn.Write(outBytes) // 关闭链接 conn.Close() }
#include <iostream> #include <sys/socket.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <time.h> int main() { int serv_sock = 0, sock = 0; struct sockaddr_in serv_addr; char sendBuff[2015]; time_t ticks; // 创建socket serv_sock = socket(AF_INET, SOCK_STREAM, 0); if (serv_sock < 0) { std::cout << "创建socket失败"; exit(1); } memset(&serv_addr, '0', sizeof(serv_addr)); memset(sendBuff, '0', sizeof(sendBuff)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);// htonl(inet_addr("127.0.0.1"));// 两种效一样 serv_addr.sin_port = htons(9999); // 绑定ip与端口 bind(serv_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); // 开启监听 listen(serv_sock, 10); std::cout << "C++版socket服务器已启动!" << std::endl; while (true) { try { // 接收消息 sock = accept(serv_sock, (sockaddr *) NULL, NULL); // 发送消息 snprintf(sendBuff, sizeof(sendBuff), "这是从C++版的socket服务器发送过来的消息:%.24s\r\n", ctime(&ticks)); write(sock, sendBuff, strlen(sendBuff)); // 关闭 // close(server_socket); close(sock); sleep(1); } catch (...) { std::cout << "服务端异常"; break; } } }
#include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <stdlib.h> #include <string.h> #include <netinet/in.h> #include <stdbool.h> #include <time.h> #include <unistd.h> int main() { int serv_sock = 0, sock = 0; struct sockaddr_in serv_addr; char sendBuff[2015]; time_t ticks; // 创建socket serv_sock = socket(AF_INET, SOCK_STREAM, 0); if (serv_sock < 0) { exit(1); } memset(&serv_addr, '0', sizeof(serv_addr)); memset(sendBuff, '0', sizeof(sendBuff)); serv_addr.sin_family = AF_INET; // serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // serv_addr.sin_port = 9999; serv_addr.sin_port = htons(9999); // 绑定ip与端口 bind(serv_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); // 开启监听 listen(serv_sock, 10); printf("C语言版的socket服务器启动成功\n"); while (true) { // 接收消息 sock = accept(serv_sock, (struct sockaddr *) NULL, NULL); // 发送消息 snprintf(sendBuff, sizeof(sendBuff), "这是从C语言版的socket服务器发送过来的消息:%.24s\r\n", ctime(&ticks)); write(sock, sendBuff, strlen(sendBuff)); // 关闭 // close(server_socket); close(sock); sleep(1); } }
using System.Net; using System.Net.Sockets; using System.Text; namespace SocketDemo; /** * https://docs.microsoft.com/en-us/dotnet/framework/network-programming/asynchronous-server-socket-example */ public class ServerDemo{ public static void Main(string[] args){ var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); serverSocket.Bind(new IPEndPoint(IpToInt("127.0.0.1"), 9999)); serverSocket.Listen(); Console.WriteLine("C#版Socket服务端已启动"); while (true) { Socket socket = serverSocket.Accept(); ThreadPool.QueueUserWorkItem((state => { DoHandle(socket); })); } } private static void DoHandle(Socket socket){ Console.WriteLine("处理客户端请求"); Console.WriteLine(socket); byte[] in_bytes = new byte[1024]; int len = socket.Receive(in_bytes); var in_str = Encoding.UTF8.GetString(in_bytes, 0, len); Console.WriteLine("接收到来着客户端的数据=>" + in_str); byte[] out_bytes = Encoding.UTF8.GetBytes("从C#服务端发送给客户端的数据"); socket.Send(out_bytes); } private static uint IpToInt(string addr){ return BitConverter.ToUInt32(IPAddress.Parse(addr).GetAddressBytes(), 0); } }
namespace global open System.Net open System.Net.Sockets open System.Text namespace SocketDemo open System open System.Threading type ServerDemo() = static member Main(args: string []) = let mutable serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) let mutable addr = IPAddress.Parse("127.0.0.1") let mutable endPoint = new IPEndPoint(addr, 9999) serverSocket.Bind(endPoint) serverSocket.Listen() Console.WriteLine("F#版Socket服务端已启动") while true do let mutable (socket: Socket) = serverSocket.Accept() ThreadPool.QueueUserWorkItem(fun state -> ServerDemo.DoHandle(socket)) |> ignore () static member private DoHandle(socket: Socket) = Console.WriteLine("处理客户端请求") Console.WriteLine(socket) let mutable (in_bytes: byte []) = Array.zeroCreate 1024 let mutable (len: int) = socket.Receive(in_bytes) let mutable in_str = Encoding.UTF8.GetString(in_bytes, 0, len) Console.WriteLine("接收到来着客户端的数据=>" + in_str) let mutable (out_bytes: byte []) = Encoding.UTF8.GetBytes("从F#服务端发送给客户端的数据") socket.Send(out_bytes) |> ignore module ServerDemo__run = [<EntryPoint>] let main args = ServerDemo.Main(args) 0
// https://github.com/oracle/graaljs/blob/master/docs/user/FAQ.md const Executors = Java.type("java.util.concurrent.Executors") const ServerSocket = Java.type("java.net.ServerSocket") const Byte = Java.type("java.lang.Byte") const String = Java.type("java.lang.String") const serverSocket = new ServerSocket(9999) console.log("Node.js版socket服务器已启动.") while (true) { const socket = serverSocket.accept() handle(socket) } function handle(socket) { try { console.log("处理客户端链接") const inputStream = socket.getInputStream() var b = java.lang.reflect.Array.newInstance(java.lang.Byte.TYPE, 1024) const read = inputStream.read(b) const in_bytes = new String(b, 0, read) console.log("接收到来着客户端的消息", in_bytes) let outputSream = socket.getOutputStream() // TODO:中文乱码,NodeJS的Bug,https://juejin.cn/post/6981797254705184798 const out_bytes = Buffer.from("msg from graaljs socket server", "utf8") outputSream.write(out_bytes) } catch (e) { console.log("系统异常:", e) } finally { if (null != socket) { socket.close() } } }
(ns com.terwergreen.clojure.ServerDemo (:import java.util.concurrent.Executors) (:import java.net.ServerSocket) ) ;定义main函数 (defn -main "main" [] ;创建线程池 (def executorService (Executors/newCachedThreadPool)) ;(println executorService) ;socket通信 (def serverSocket (new ServerSocket 9999)) (println serverSocket) ;启动服务器 (println "clojore socker server is runnning...") ;(.println (System/out) "clojore socker server is runnning...") (while true (def socket (.accept serverSocket)) ;Runnable线程 (def run_fun (fn [socket] (println "message"))) ;放入线程池 ;(def call_run_fun (run_fun socket)) ;TODO:lambda传参有问题 (.execute executorService call_run_fun) ) ) ;(defn handle [socket] (print "Hello"))
客户端代码
/** * 客户端 * * @name: ClientDemo * @author: terwer * @date: 2022-04-17 15:30 **/ public class ClientDemo { public static void main(String[] args) throws IOException { while (true) { // 1.创建客户端socket Socket s = new Socket("127.0.0.1", 9999); // 2.从连接中获取输出流并发送消息 OutputStream os = s.getOutputStream(); System.out.println("请输入:"); Scanner sc = new Scanner(System.in); String msg = sc.nextLine(); os.write(msg.getBytes()); // 3.从连接中取出输入流并接受会话 InputStream is = s.getInputStream(); byte[] b = new byte[1024]; // 下面写法错了 // int read = is.read(); // 应该是 int read = is.read(b); System.out.println("老板说:" + new String(b, 0, read).trim()); s.close(); } } }
参考:
https://www.thegeekstuff.com/2011/12/c-socket-programming/
IO模型
IO模型说明
简单理解:用什么样的通道进行数据的发送和接收。在很大程度上决定了程序通信的性能。
Java支持三种网络编程模型I/O模式:BIO(同步阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞)
阻塞与非阻塞
指的是网络IO的线程是否处于阻塞或者等待状态
线程访问资源,该资源是否准备就绪的一种处理方式
同步和异步
指的是数据的请求方式,同步和异步是请求数据的一种方式。
BIO(同步阻塞)
Java BIO就是传统的socket编程
BIO(blocking IO):同步阻塞,服务器实现方式为一个链接一个线程,即客户端有一个链接时,服务端就要启动一个线程进行处理,如果这个链接不作任何事情,会造成不必要的线程开销,可以通过线程池改善,实现多个客户端链接服务器。
工作机制
生活中的例子:
BIO的问题分析
- 每个请求都要创建独立的线程,与对应的客户端进行read,业务处理,数据write
- 并发量大的时候,需要创建大量的线程来处理链接,系统资源占用较大
- 链接建立后,如果当前线程没有数据可读,线程就阻塞在read上,造成线程资源浪费
NIO(同步非阻塞)
同步非阻塞,服务器实现模式为一个线程处理多个请求(链接),客户端发送的链接请求会注册到多路复用器上,多路复用器轮训到链接有IO请求就进行处理。
生活中的例子:
AIO(异步非阻塞)
AIO引入了异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程。
他的特点是先由操作系统完成后才通知服务端启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
Proactor模式是一个消息异步通知的模式,Proactor通知的不是就绪事件,而是操作完成事件,这也是操作系统异步IO的主要模型。
https://www.zhihu.com/question/26943938
生活中的例子:
BIO、NIO、AIO的适用场景分析
- BIO(同步阻塞模式)适用于连接数比较小,且固定的架构。对服务器资源的要求比较高,并发局限于应用中,jdk1.4以前的唯一选择,代码容易理解。
- NIO(同步非阻塞模式)适用于链接数目多且链接比较短(轻操作)的架构,比如聊天服务器、弹幕系统、服务器之间的通讯等。编程比较复杂,jdk1.4开始支持。
- AIO(异步非阻塞模式)适用于连接数目比较多并且连接时间比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作。编程比较复杂,jdk1.7开始支持。