V2EX = way to explore
V2EX 是一个关于分享和探索的地方
已注册用户请  登录
V2EX  ›  qianchengv  ›  全部回复第 1 页 / 共 1 页
回复总数  11
vivo x200 pro
12 天前
回复了 qianchengv 创建的主题 Apple 分享一下苹果教育优惠闲鱼链接
从 Mac mini output 换 C 口的线连显示器可以做到秒级
12 天前
回复了 Meld 创建的主题 Apple 刚刚,天猫购买的教育优惠 Mac mini 丐版发货了
13 天前
回复了 qianchengv 创建的主题 小米 小米手机关机之后一星期就亏电无法开机
去了益田假日,Genius 表示大概率可以现场更换镜头,但检测后表示系统无法门店更换镜头,现已返厂维修,预计需要 15 天(大概率是因为我在 pdd 百补买的 iPhone PM )。
54 天前
回复了 smdbh 创建的主题 Python 请教一个 Python 中线程共享数据的问题
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import unittest
from dataclasses import dataclass, field
from threading import Lock
import multiprocessing

class SharedData:
value: int = 0
# Using a Lock to ensure thread-safety when accessing shared data
lock: Lock = field(default_factory=Lock, init=False, repr=False)

def increment(self):
with self.lock:
self.value += 1

def get_value(self):
with self.lock:
return self.value

def worker(data: SharedData, num_iterations: int):
local_sum = 0
for _ in range(num_iterations):
local_sum += 1
# Use a lock to safely update the shared data
with data.lock:
data.value += local_sum

class TestSharedDataThreadSafety(unittest.TestCase):
def test_concurrent_increments(self):
shared_data = SharedData()
# Use 2x CPU count for threads to test both CPU-bound and I/O-bound scenarios
num_threads = multiprocessing.cpu_count() * 2
num_iterations = 1000000 // num_threads

with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(worker, shared_data, num_iterations) for _ in range(num_threads)]
for future in futures:

expected_value = num_threads * num_iterations
self.assertEqual(shared_data.get_value(), expected_value,
f"Expected {expected_value}, but got {shared_data.get_value()}")

def test_race_condition(self):
shared_data = SharedData()
race_detected = threading.Event()

def racer():
with shared_data.lock:
initial_value = shared_data.value
time.sleep(0.001) # Simulate some work
# Check if the value has changed, which would indicate a race condition
if initial_value == shared_data.value:
shared_data.value += 1

threads = [threading.Thread(target=racer) for _ in range(100)]
for t in threads:
for t in threads:

self.assertFalse(race_detected.is_set(), "Race condition detected")

def test_stress_test(self):
shared_data = SharedData()
stop_flag = threading.Event()

def stress_worker():
local_sum = 0
while not stop_flag.is_set():
local_sum += 1
# Use a lock to safely update the shared data after intensive local computation
with shared_data.lock:
shared_data.value += local_sum

# Use CPU count for threads to maximize resource utilization
threads = [threading.Thread(target=stress_worker) for _ in range(multiprocessing.cpu_count())]
for t in threads:

time.sleep(5) # Run for 5 seconds to simulate prolonged stress

for t in threads:

print(f"Stress test final value: {shared_data.get_value()}")

if __name__ == '__main__':
230 天前
回复了 qianchengv 创建的主题 深圳 深圳哪里有线下 Vision Pro 付费体验?
我看了,太贵 3 天 1500 ,这是最低的租金了
关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2589 人在线   最高记录 6679   ·     Select Language
World is powered by solitude
VERSION: · 21ms · UTC 06:39 · PVG 14:39 · LAX 22:39 · JFK 01:39
Developed with CodeLauncher
♥ Do have faith in what you're doing.