1. 서버
- 패키지 설치는 아래의 명령어로 한다.
pip3 install Flask-SocketIO==5.1.2 python-engineio==4.3.2 python-socketio==5.6.0 eventlet
- 서버 코드는 다음과 같이 작성한다.
from flask import Flask from flask_socketio import SocketIO import datetime import eventlet app = Flask(__name__) socketio = SocketIO(app) socketio.init_app(app, cors_allowed_origins="*") @app.route('/') def hello(): return '<h1>This is site</h1>' def send_count_in_intervals(): count = 0 while True: socketio.sleep(1) socketio.emit('message', { 'time': datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"), 'count': count }) count += 1 if __name__ == '__main__': thread = socketio.start_background_task(send_count_in_intervals) eventlet.wsgi.server(eventlet.listen(('', 9999)), app)
- 코드를 실행하면, 다음과 같이 로그를 기록한다.
2. 클라이언트
- 패키지 설치는 아래의 명령어로 한다.
pip3 install python-socketio
- 클라이언트 코드는 다음과 같이 작성한다.
import socketio sio = socketio.Client() sio.connect('ws://127.0.0.1:9999') @sio.on('message') def receive_message(message): print('message', message)
- 코드를 실행하면, 다음과 같이 로그를 기록한다.
3. 포스트맨 테스트
- WebSocket Request를 생성한다.
- Raw → Socket.IO로 변경한다.
- 접속 주소는 ws 혹은 wss 프로토콜로 적절한 주소를 입력한다.
- Events에 적절한 이름을 입력하고, Listen on connect를 활성화한다.
- 설정이 완료된 후, 연결을 해서 접속을 테스트해본다.