目录
- 连接池实现
- 使用连接池
连接池实现
socket_pool.py
-*- coding:utf-8 -*-import socketimport timeimport threadingimport osimport loggingimport tracebackfrom queue import Queue, Empty_logger = logging.getLogger(‘mylogger’)class SocketPool: def __init__(self, host, port, min_connections=10, max_connections=10): ”’ 初始化Socket连接池 :param host: 目标主机地址 :param port: 目标端口号 :param min_connections: 最小连接数 :param max_connections: 最大连接数 ”’ self.host = host self.port = port self.min_connections = min_connections self.max_connections = max_connections self.busy_sockets_dict = } 存放从连接池取出的socket的id self._sock_lock = threading.Lock() 线程锁保证计数正确 self._pool = Queue(max_connections) 基于线程安全的队列存储连接 self._lock = threading.Lock() 线程锁保证资源安全: self._init_pool() 预创建连接 self._start_health_check() 启动连接健壮检查线程 def _init_pool(self): ”’预创建连接并填充到池中”’ for _ in range(self.min_connections): sock = self._create_socket() self._pool.put(sock) def _create_socket(self): ”’创建新的Socket连接”’ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((self.host, self.port)) return sock except socket.error as e: raise ConnectionError(f’Failed to connect: e}’) 连接失败抛出异常 def _start_health_check(self): ”’启动后台线程定期检查连接有效性”’ def check(): while True: with self._lock: for _ in range(self._pool.qsize()): sock = self._pool.get() self.busy_sockets_dict[sock] = 1 try: sock.send(b’PING<END>’) 发送心跳包验证连接情形 下面内容 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健壮检查响应报文数据存在多余内容,不符合格式,从而导致数据解析难题 sock.recv(11) self._pool.put(sock) self.busy_sockets_dict.pop(sock) except (socket.error, ConnectionResetError): _logger.error(‘socket连接健壮检查出错:%s, 关闭失效连接并创建新连接替换’ % traceback.format_exc()) sock.close() 关闭失效连接并创建新连接替换 self.busy_sockets_dict.pop(sock) new_sock = self._create_socket() self._pool.put(new_sock) 如果sock数量小于最小数量,则补充 for _ in range(0, self.min_connections – self._pool.qsize()): new_sock = self._create_socket() self._pool.put(new_sock) time.sleep(60) 每60秒检查一次 threading.Thread(target=check, daemon=True).start() def get_connection(self): ”’ 从池中获取一个可用连接 :return: socket对象 ”’ with self._sock_lock: if self._pool.empty(): if len(self.busy_sockets_dict.keys()) < self.max_connections: new_sock = self._create_socket() self.busy_sockets_dict[new_sock] = 1 return new_sock else: raise Empty(‘No available connections in pool’) else: try: sock = self._pool.get(block=False) self.busy_sockets_dict[sock] = 1 return sock except Exception: _logger.error(‘获取socket连接出错:%s’ % traceback.format_exc()) raise def release_connection(self, sock): ”’ 将连接归还到池中 :param sock: 待归还的socket对象 ”’ if not sock._closed: self._pool.put(sock) if sock in self.busy_sockets_dict: self.busy_sockets_dict.pop(sock) def close_all(self): ”’关闭池中所有连接”’ while not self._pool.empty(): sock = self._pool.get() sock.close() self.busy_sockets_dict.pop(sock.id) self.busy_sockets_dict = } 兜底host = os.environ.get(‘MODBUS_TCP_SERVER_HOST’, ‘127.0.0.1’)port = int(os.environ.get(‘MODBUS_TCP_SERVER_PORT’, ‘9000’))min_connections = int(os.environ.get(‘DJANGO_SOCKET_POOL_MAX_CONNECTIONS’, ’10’))max_connections = int(os.environ.get(‘DJANGO_SOCKET_POOL_MAX_CONNECTIONS’, ‘100’))socketPool = SocketPool(host, port, min_connections, max_connections)
使用连接池
from socket_pool import socketPooldef send_socket_msg(data): global socketPool try: sock = None 获取连接(支持超时控制) sock = socketPool.get_connection() 发送数据 sock.sendall(data.encode(‘utf-8’)) except Exception: error_msg = ‘发送消息出错:%s’ % traceback.format_exc() _logger.error(error_msg) if sock is not None: sock.close() socketPool.release_connection(sock) return send_socket_msg(data) response = ” try: while True: chunk = sock.recv(4096) chunk = chunk.decode(‘utf-8’) response += chunk if response.endswith(‘<END>’): response = response.rstrip(‘<END>’) return ‘success’:True, ‘message’:response} except Exception: error_msg = ‘获取消息出错:%s’ % traceback.format_exc() _logger.error(error_msg) return ‘success’:False, ‘message’: error_msg} finally: 必须归还连接! socketPool.release_connection(sock)
到此这篇关于Python 基于队列实现 tcp socket 连接池的文章就介绍到这了,更多相关Python tcp socket 连接池内容请搜索风君子博客以前的文章或继续浏览下面的相关文章希望大家以后多多支持风君子博客!
无论兄弟们可能感兴趣的文章:
- Pythonsocket怎样实现服务端和客户端数据传输(TCP)
- python用socket实现协议TCP长连接框架
- Python基于socket实现TCP/IP客户和服务器通信
- Python使用socket模块实现简单tcp通信
- python 使用raw socket进行TCP SYN扫描实例
- python基于socket实现的UDP及TCP通讯功能示例
- python使用socket实现的传输demo示例基于TCP协议
- Python中的TCP socket写法示例
- python使用socket创建tcp服务器和客户端
- Python封装数据库连接池详解
- Python3 多线程(连接池)操作MySQL插入数据
- python连接池实现示例程序