前面我们了解到 langgraph cache 类似于 @functools.lru_cache
,可以缓存函数的返回值,避免重复计算。这一节我们来看 Cache 的具体实现细节。
1. BaseCache
cache 如实现由 BaseCache 抽象基类约定。
1.1 BaseCache 属性
1
2
3
4
5
6
7
8
9
|
class BaseCache(ABC, Generic[ValueT]):
"""Base class for a cache."""
# 序列化器
serde: SerializerProtocol = JsonPlusSerializer(pickle_fallback=True)
def __init__(self, *, serde: SerializerProtocol | None = None) -> None:
"""Initialize the cache with a serializer."""
self.serde = serde or self.serde
|
1.2 BaseCache 方法
BaseCache 定义的方法都是抽象方法。
方法名 |
作用描述 |
输出值类型 |
get |
同步获取多个缓存项的值(根据多个 FullKey ) |
dict[FullKey, ValueT] |
aget |
异步获取多个缓存项的值 |
dict[FullKey, ValueT] |
set |
同步设置多个缓存项的值及其 TTL(以 FullKey 为键,值为 (ValueT, TTL) ) |
None |
aset |
异步设置多个缓存项的值及其 TTL |
None |
clear |
同步清除指定命名空间(Namespace )下的缓存项,若未提供则清空所有 |
None |
aclear |
异步清除指定命名空间(Namespace )下的缓存项,若未提供则清空所有 |
None |
2. InMemoryCache
Langgraph 提供了一个 BaseCache 基于内存的实现 InMemoryCache。我们简单看一下它的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
Namespace = tuple[str, ...]
FullKey = tuple[Namespace, str]
class InMemoryCache(BaseCache[ValueT]):
def __init__(self, *, serde: SerializerProtocol | None = None):
super().__init__(serde=serde)
self._cache: dict[Namespace, dict[str, tuple[str, bytes, float | None]]] = {}
self._lock = threading.RLock()
def get(self, keys: Sequence[FullKey]) -> dict[FullKey, ValueT]:
"""Get the cached values for the given keys."""
with self._lock:
if not keys:
return {}
now = datetime.datetime.now(datetime.timezone.utc).timestamp()
values: dict[FullKey, ValueT] = {}
for ns_tuple, key in keys:
ns = Namespace(ns_tuple)
if ns in self._cache and key in self._cache[ns]:
enc, val, expiry = self._cache[ns][key]
if expiry is None or now < expiry:
values[(ns, key)] = self.serde.loads_typed((enc, val))
else:
del self._cache[ns][key]
return values
def set(self, keys: Mapping[FullKey, tuple[ValueT, int | None]]) -> None:
"""Set the cached values for the given keys."""
with self._lock:
now = datetime.datetime.now(datetime.timezone.utc)
for (ns, key), (value, ttl) in keys.items():
if ttl is not None:
delta = datetime.timedelta(seconds=ttl)
expiry: float | None = (now + delta).timestamp()
else:
expiry = None
if ns not in self._cache:
self._cache[ns] = {}
self._cache[ns][key] = (
*self.serde.dumps_typed(value),
expiry,
)
|
InMemoryCache 的实现很简单,就是一个线程安全的字典,键为 FullKey
,值为 (enc, val, expiry)
。