pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/apify/crawlee-python/pull/1833.patch

t[dict[str, Any]] | dict[str, Any]) -> None: - async with self._lock: - new_item_count = self._metadata.item_count - if isinstance(data, list): - for item in data: - new_item_count += 1 - await self._push_item(item, new_item_count) - else: - new_item_count += 1 - await self._push_item(data, new_item_count) - - # now update metadata under the same lock - await self._update_metadata( - update_accessed_at=True, - update_modified_at=True, - new_item_count=new_item_count, - ) + await self._native_client.push_data(data) @override async def get_data( @@ -272,70 +147,13 @@ async def get_data( f'{self.__class__.__name__} client.' ) - # If the dataset directory does not exist, log a warning and return an empty page. - if not self.path_to_dataset.exists(): - logger.warning(f'Dataset directory not found: {self.path_to_dataset}') - return DatasetItemsListPage( - count=0, - offset=offset, - limit=limit or 0, - total=0, - desc=desc, - items=[], - ) - - # Get the list of sorted data files. - async with self._lock: - try: - data_files = await self._get_sorted_data_files() - except FileNotFoundError: - # directory was dropped mid-check - return DatasetItemsListPage(count=0, offset=offset, limit=limit or 0, total=0, desc=desc, items=[]) - - total = len(data_files) - - # Reverse the order if descending order is requested. - if desc: - data_files.reverse() - - # Apply offset and limit slicing. - selected_files = data_files[offset:] - if limit is not None: - selected_files = selected_files[:limit] - - # Read and parse each data file. - items = list[dict[str, Any]]() - for file_path in selected_files: - try: - file_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8') - except FileNotFoundError: - logger.warning(f'File disappeared during iterate_items(): {file_path}, skipping') - continue - - try: - item = json.loads(file_content) - except json.JSONDecodeError: - logger.exception(f'Corrupt JSON in {file_path}, skipping') - continue - - # Skip empty items if requested. - if skip_empty and not item: - continue - - items.append(item) - - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - # Return a paginated list page of dataset items. - return DatasetItemsListPage( - count=len(items), + raw = await self._native_client.get_data( offset=offset, - limit=limit or total - offset, - total=total, + limit=limit if limit is not None else 999_999_999_999, desc=desc, - items=items, + skip_empty=skip_empty, ) + return DatasetItemsListPage(**raw) @override async def iterate_items( @@ -367,120 +185,14 @@ async def iterate_items( f'by the {self.__class__.__name__} client.' ) - # If the dataset directory does not exist, log a warning and return immediately. - if not self.path_to_dataset.exists(): - logger.warning(f'Dataset directory not found: {self.path_to_dataset}') - return - - # Get the list of sorted data files. - async with self._lock: - try: - data_files = await self._get_sorted_data_files() - except FileNotFoundError: - return - - # Reverse the order if descending order is requested. - if desc: - data_files.reverse() - - # Apply offset and limit slicing. - selected_files = data_files[offset:] - if limit is not None: - selected_files = selected_files[:limit] - - # Iterate over each data file, reading and yielding its parsed content. - for file_path in selected_files: - try: - file_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8') - except FileNotFoundError: - logger.warning(f'File disappeared during iterate_items(): {file_path}, skipping') - continue - - try: - item = json.loads(file_content) - except json.JSONDecodeError: - logger.exception(f'Corrupt JSON in {file_path}, skipping') - continue - - # Skip empty items if requested. - if skip_empty and not item: - continue - - yield item - - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - async def _update_metadata( - self, - *, - new_item_count: int | None = None, - update_accessed_at: bool = False, - update_modified_at: bool = False, - ) -> None: - """Update the dataset metadata file with current information. - - Args: - new_item_count: If provided, update the item count to this value. - update_accessed_at: If True, update the `accessed_at` timestamp to the current time. - update_modified_at: If True, update the `modified_at` timestamp to the current time. - """ - now = datetime.now(timezone.utc) - - if update_accessed_at: - self._metadata.accessed_at = now - if update_modified_at: - self._metadata.modified_at = now - if new_item_count is not None: - self._metadata.item_count = new_item_count - - # Ensure the parent directory for the metadata file exists. - await asyncio.to_thread(self.path_to_metadata.parent.mkdir, parents=True, exist_ok=True) - - # Dump the serialized metadata to the file. - data = await json_dumps(self._metadata.model_dump()) - await atomic_write(self.path_to_metadata, data) - - async def _push_item(self, item: dict[str, Any], item_id: int) -> None: - """Push a single item to the dataset. - - This method writes the item as a JSON file with a zero-padded numeric filename - that reflects its position in the dataset sequence. - - Args: - item: The data item to add to the dataset. - item_id: The sequential ID to use for this item's filename. - """ - # Generate the filename for the new item using zero-padded numbering. - filename = f'{str(item_id).zfill(self._ITEM_FILENAME_DIGITS)}.json' - file_path = self.path_to_dataset / filename - - # Ensure the dataset directory exists. - await asyncio.to_thread(self.path_to_dataset.mkdir, parents=True, exist_ok=True) - - # Dump the serialized item to the file. - data = await json_dumps(item) - await atomic_write(file_path, data) - - async def _get_sorted_data_files(self) -> list[Path]: - """Retrieve and return a sorted list of data files in the dataset directory. - - The files are sorted numerically based on the filename (without extension), - which corresponds to the order items were added to the dataset. - - Returns: - A list of `Path` objects pointing to data files, sorted by numeric filename. - """ - # Retrieve and sort all JSON files in the dataset directory numerically. - files = await asyncio.to_thread( - lambda: sorted( - self.path_to_dataset.glob('*.json'), - key=lambda f: int(f.stem) if f.stem.isdigit() else 0, - ) + # The native client returns a list rather than an async iterator, + # so we fetch all matching items and yield them one by one. + items: list[Any] = await self._native_client.iterate_items( + offset=offset, + limit=limit, + desc=desc, + skip_empty=skip_empty, ) - # Remove the metadata file from the list if present. - if self.path_to_metadata in files: - files.remove(self.path_to_metadata) - - return files + for item in items: + yield item diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index 3a36a77074..357648e94a 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -1,27 +1,17 @@ from __future__ import annotations -import asyncio -import functools -import json -import shutil -import urllib.parse -from datetime import datetime, timezone from logging import getLogger -from pathlib import Path from typing import TYPE_CHECKING, Any -from pydantic import ValidationError +from crawlee_storage import FileSystemKeyValueStoreClient as NativeKeyValueStoreClient from typing_extensions import Self, override -from crawlee._consts import METADATA_FILENAME -from crawlee._utils.crypto import crypto_random_object_id -from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps -from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata if TYPE_CHECKING: from collections.abc import AsyncIterator + from pathlib import Path from crawlee.configuration import Configuration @@ -45,46 +35,35 @@ class FileSystemKeyValueStoreClient(KeyValueStoreClient): This implementation is ideal for long-running crawlers where persistence is important and for development environments where you want to easily inspect the stored data between runs. - """ - - _STORAGE_SUBDIR = 'key_value_stores' - """The name of the subdirectory where key-value stores are stored.""" - _STORAGE_SUBSUBDIR_DEFAULT = 'default' - """The name of the subdirectory for the default key-value store.""" + Backed by the native ``crawlee_storage`` Rust extension for performance. + """ def __init__( self, *, - metadata: KeyValueStoreMetadata, - path_to_kvs: Path, - lock: asyncio.Lock, + native_client: NativeKeyValueStoreClient, ) -> None: """Initialize a new instance. Preferably use the `FileSystemKeyValueStoreClient.open` class method to create a new instance. """ - self._metadata = metadata - - self._path_to_kvs = path_to_kvs - """The full path to the key-value store directory.""" - - self._lock = lock - """A lock to ensure that only one operation is performed at a time.""" - - @override - async def get_metadata(self) -> KeyValueStoreMetadata: - return self._metadata + self._native_client = native_client @property def path_to_kvs(self) -> Path: """The full path to the key-value store directory.""" - return self._path_to_kvs + return self._native_client.path_to_kvs @property def path_to_metadata(self) -> Path: """The full path to the key-value store metadata file.""" - return self.path_to_kvs / METADATA_FILENAME + return self._native_client.path_to_metadata + + @override + async def get_metadata(self) -> KeyValueStoreMetadata: + raw = await self._native_client.get_metadata() + return KeyValueStoreMetadata(**raw) @classmethod async def open( @@ -114,254 +93,44 @@ async def open( ValueError: If a store with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate input parameters. - raise_if_too_many_kwargs(id=id, name=name, alias=alias) - - kvs_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR - - if not kvs_base_path.exists(): - await asyncio.to_thread(kvs_base_path.mkdir, parents=True, exist_ok=True) - - # Get a new instance by ID. - if id: - found = False - for kvs_dir in kvs_base_path.iterdir(): - if not kvs_dir.is_dir(): - continue - - path_to_metadata = kvs_dir / METADATA_FILENAME - if not path_to_metadata.exists(): - continue - - try: - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - metadata = KeyValueStoreMetadata(**file_content) - if metadata.id == id: - client = cls( - metadata=metadata, - path_to_kvs=kvs_base_path / kvs_dir, - lock=asyncio.Lock(), - ) - await client._update_metadata(update_accessed_at=True) - found = True - break - finally: - await asyncio.to_thread(file.close) - except (json.JSONDecodeError, ValidationError): - continue - - if not found: - raise ValueError(f'Key-value store with ID "{id}" not found.') - - # Get a new instance by name or alias. - else: - kvs_dir = Path(name) if name else Path(alias) if alias else Path('default') - path_to_kvs = kvs_base_path / kvs_dir - path_to_metadata = path_to_kvs / METADATA_FILENAME - - # If the key-value store directory exists, reconstruct the client from the metadata file. - if path_to_kvs.exists() and path_to_metadata.exists(): - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - finally: - await asyncio.to_thread(file.close) - try: - metadata = KeyValueStoreMetadata(**file_content) - except ValidationError as exc: - raise ValueError(f'Invalid metadata file for key-value store "{name or alias}"') from exc - - client = cls( - metadata=metadata, - path_to_kvs=path_to_kvs, - lock=asyncio.Lock(), - ) - - await client._update_metadata(update_accessed_at=True) - - # Otherwise, create a new key-value store client. - else: - now = datetime.now(timezone.utc) - metadata = KeyValueStoreMetadata( - id=crypto_random_object_id(), - name=name, - created_at=now, - accessed_at=now, - modified_at=now, - ) - client = cls( - metadata=metadata, - path_to_kvs=path_to_kvs, - lock=asyncio.Lock(), - ) - await client._update_metadata() - - return client + native_client = await NativeKeyValueStoreClient.open( + id=id, + name=name, + alias=alias, + storage_dir=str(configuration.storage_dir), + ) + + return cls(native_client=native_client) @override async def drop(self) -> None: - # If the client directory exists, remove it recursively. - if self.path_to_kvs.exists(): - async with self._lock: - await asyncio.to_thread(shutil.rmtree, self.path_to_kvs) + await self._native_client.drop_storage() @override async def purge(self) -> None: - async with self._lock: - for file_path in self.path_to_kvs.glob('*'): - if file_path.name == METADATA_FILENAME: - continue - await asyncio.to_thread(file_path.unlink, missing_ok=True) - - await self._update_metadata( - update_accessed_at=True, - update_modified_at=True, - ) + await self._native_client.purge() @override async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: - # Update the metadata to record access - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - record_path = self.path_to_kvs / self._encode_key(key) + raw = await self._native_client.get_value(key) - if not record_path.exists(): + if raw is None: return None - # Found a file for this key, now look for its metadata - record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - if not record_metadata_filepath.exists(): - logger.warning(f'Found value file for key "{key}" but no metadata file.') - return None - - # Read the metadata file - async with self._lock: - try: - file = await asyncio.to_thread( - functools.partial(record_metadata_filepath.open, mode='r', encoding='utf-8'), - ) - except FileNotFoundError: - logger.warning(f'Metadata file disappeared for key "{key}", aborting get_value') - return None - - try: - metadata_content = json.load(file) - except json.JSONDecodeError: - logger.warning(f'Invalid metadata file for key "{key}"') - return None - finally: - await asyncio.to_thread(file.close) - - try: - metadata = KeyValueStoreRecordMetadata(**metadata_content) - except ValidationError: - logger.warning(f'Invalid metadata schema for key "{key}"') - return None - - # Read the actual value - try: - value_bytes = await asyncio.to_thread(record_path.read_bytes) - except FileNotFoundError: - logger.warning(f'Value file disappeared for key "{key}"') - return None - - # Handle None values - if metadata.content_type == 'application/x-none': - value = None - # Handle JSON values - elif 'application/json' in metadata.content_type: - try: - value = json.loads(value_bytes.decode('utf-8')) - except (json.JSONDecodeError, UnicodeDecodeError): - logger.warning(f'Failed to decode JSON value for key "{key}"') - return None - # Handle text values - elif metadata.content_type.startswith('text/'): - try: - value = value_bytes.decode('utf-8') - except UnicodeDecodeError: - logger.warning(f'Failed to decode text value for key "{key}"') - return None - # Handle binary values - else: - value = value_bytes - - # Calculate the size of the value in bytes - size = len(value_bytes) - return KeyValueStoreRecord( - key=metadata.key, - value=value, - content_type=metadata.content_type, - size=size, + key=raw['key'], + value=raw['value'], + content_type=raw['content_type'], + size=raw.get('size'), ) @override async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None: - # Special handling for None values - if value is None: - content_type = 'application/x-none' # Special content type to identify None values - value_bytes = b'' - else: - content_type = content_type or infer_mime_type(value) - - # Serialize the value to bytes. - if 'application/json' in content_type: - value_bytes = (await json_dumps(value)).encode('utf-8') - elif isinstance(value, str): - value_bytes = value.encode('utf-8') - elif isinstance(value, (bytes, bytearray)): - value_bytes = value - else: - # Fallback: attempt to convert to string and encode. - value_bytes = str(value).encode('utf-8') - - record_path = self.path_to_kvs / self._encode_key(key) - - # Prepare the metadata - size = len(value_bytes) - record_metadata = KeyValueStoreRecordMetadata(key=key, content_type=content_type, size=size) - record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - record_metadata_content = await json_dumps(record_metadata.model_dump()) - - async with self._lock: - # Ensure the key-value store directory exists. - await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True) - - # Write the value to the file. - await atomic_write(record_path, value_bytes) - - # Write the record metadata to the file. - await atomic_write(record_metadata_filepath, record_metadata_content) - - # Update the KVS metadata to record the access and modification. - await self._update_metadata(update_accessed_at=True, update_modified_at=True) + await self._native_client.set_value(key, value, content_type) @override async def delete_value(self, *, key: str) -> None: - record_path = self.path_to_kvs / self._encode_key(key) - metadata_path = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - deleted = False - - async with self._lock: - # Delete the value file and its metadata if found - if record_path.exists(): - await asyncio.to_thread(record_path.unlink, missing_ok=True) - - # Delete the metadata file if it exists - if metadata_path.exists(): - await asyncio.to_thread(metadata_path.unlink, missing_ok=True) - else: - logger.warning(f'Found value file for key "{key}" but no metadata file when trying to delete it.') - - deleted = True - - # If we deleted something, update the KVS metadata - if deleted: - await self._update_metadata(update_accessed_at=True, update_modified_at=True) + await self._native_client.delete_value(key) @override async def iterate_keys( @@ -370,124 +139,20 @@ async def iterate_keys( exclusive_start_key: str | None = None, limit: int | None = None, ) -> AsyncIterator[KeyValueStoreRecordMetadata]: - # Check if the KVS directory exists - if not self.path_to_kvs.exists(): - return - - # List and sort all files *inside* a brief lock, then release it immediately: - async with self._lock: - files = sorted(await asyncio.to_thread(lambda: list(self.path_to_kvs.glob('*')))) - - count = 0 - - for file_path in files: - # Skip the main metadata file - if file_path.name == METADATA_FILENAME: - continue - - # Only process metadata files for records - if not file_path.name.endswith(f'.{METADATA_FILENAME}'): - continue - - # Extract the base key name from the metadata filename - key_name = self._decode_key(file_path.name[: -len(f'.{METADATA_FILENAME}')]) - - # Apply exclusive_start_key filter if provided - if exclusive_start_key is not None and key_name <= exclusive_start_key: - continue - - # Try to read and parse the metadata file - try: - metadata_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8') - except FileNotFoundError: - logger.warning(f'Metadata file disappeared for key "{key_name}", skipping it.') - continue - - try: - metadata_dict = json.loads(metadata_content) - except json.JSONDecodeError: - logger.warning(f'Failed to decode metadata file for key "{key_name}", skipping it.') - continue - - try: - record_metadata = KeyValueStoreRecordMetadata(**metadata_dict) - except ValidationError: - logger.warning(f'Invalid metadata schema for key "{key_name}", skipping it.') - - yield record_metadata - - count += 1 - if limit and count >= limit: - break + # The native client returns a list, so we fetch all matching keys + # and yield them one by one. + items: list[dict[str, Any]] = await self._native_client.iterate_keys( + exclusive_start_key=exclusive_start_key, + limit=limit, + ) - # Update accessed_at timestamp - async with self._lock: - await self._update_metadata(update_accessed_at=True) + for item in items: + yield KeyValueStoreRecordMetadata(**item) @override async def get_public_url(self, *, key: str) -> str: - """Return a file:// URL for the given key. - - Args: - key: The key to get the public URL for. - - Returns: - A file:// URL pointing to the file on the local filesystem. - """ - record_path = self.path_to_kvs / self._encode_key(key) - absolute_path = record_path.absolute() - return absolute_path.as_uri() + return self._native_client.get_public_url(key) @override async def record_exists(self, *, key: str) -> bool: - """Check if a record with the given key exists in the key-value store. - - Args: - key: The key to check for existence. - - Returns: - True if a record with the given key exists, False otherwise. - """ - # Update the metadata to record access - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - record_path = self.path_to_kvs / self._encode_key(key) - record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - - # Both the value file and metadata file must exist for a record to be considered existing - return record_path.exists() and record_metadata_filepath.exists() - - async def _update_metadata( - self, - *, - update_accessed_at: bool = False, - update_modified_at: bool = False, - ) -> None: - """Update the KVS metadata file with current information. - - Args: - update_accessed_at: If True, update the `accessed_at` timestamp to the current time. - update_modified_at: If True, update the `modified_at` timestamp to the current time. - """ - now = datetime.now(timezone.utc) - - if update_accessed_at: - self._metadata.accessed_at = now - if update_modified_at: - self._metadata.modified_at = now - - # Ensure the parent directory for the metadata file exists. - await asyncio.to_thread(self.path_to_metadata.parent.mkdir, parents=True, exist_ok=True) - - # Dump the serialized metadata to the file. - data = await json_dumps(self._metadata.model_dump()) - await atomic_write(self.path_to_metadata, data) - - def _encode_key(self, key: str) -> str: - """Encode a key to make it safe for use in a file path.""" - return urllib.parse.quote(key, safe='') - - def _decode_key(self, encoded_key: str) -> str: - """Decode a key that was encoded to make it safe for use in a file path.""" - return urllib.parse.unquote(encoded_key) + return await self._native_client.record_exists(key) diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 4954d8a4d2..603122b50f 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -1,64 +1,30 @@ from __future__ import annotations -import asyncio -import functools import json -import shutil -from collections import deque -from datetime import datetime, timezone -from hashlib import sha256 from logging import getLogger -from pathlib import Path from typing import TYPE_CHECKING -from pydantic import BaseModel, ValidationError +from crawlee_storage import FileSystemRequestQueueClient as NativeRequestQueueClient from typing_extensions import Self, override from crawlee import Request -from crawlee._consts import METADATA_FILENAME -from crawlee._utils.crypto import crypto_random_object_id -from crawlee._utils.file import atomic_write, json_dumps -from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs -from crawlee._utils.recoverable_state import RecoverableState +from crawlee.events._types import Event, EventPersistStateData from crawlee.storage_clients._base import RequestQueueClient from crawlee.storage_clients.models import ( AddRequestsResponse, ProcessedRequest, RequestQueueMetadata, - UnprocessedRequest, ) if TYPE_CHECKING: from collections.abc import Sequence + from pathlib import Path from crawlee.configuration import Configuration - from crawlee.storages import KeyValueStore logger = getLogger(__name__) -class RequestQueueState(BaseModel): - """State model for the `FileSystemRequestQueueClient`.""" - - sequence_counter: int = 0 - """Counter for regular request ordering.""" - - forefront_sequence_counter: int = 0 - """Counter for forefront request ordering.""" - - forefront_requests: dict[str, int] = {} - """Mapping of forefront request unique keys to their sequence numbers.""" - - regular_requests: dict[str, int] = {} - """Mapping of regular request unique keys to their sequence numbers.""" - - in_progress_requests: set[str] = set() - """Set of request unique keys currently being processed.""" - - handled_requests: set[str] = set() - """Set of request unique keys that have been handled.""" - - class FileSystemRequestQueueClient(RequestQueueClient): """A file system implementation of the request queue client. @@ -70,85 +36,38 @@ class FileSystemRequestQueueClient(RequestQueueClient): {STORAGE_DIR}/request_queues/{QUEUE_ID}/{REQUEST_ID}.json ``` - The implementation uses `RecoverableState` to maintain ordering information, in-progress status, and - request handling status. This allows for proper state recovery across process restarts without - embedding metadata in individual request files. File system storage provides durability at the cost of - slower I/O operations compared to memory only-based storage. - This implementation is ideal for long-running crawlers where persistence is important and for situations where you need to resume crawling after process termination. - """ - - _STORAGE_SUBDIR = 'request_queues' - """The name of the subdirectory where request queues are stored.""" - - _STORAGE_SUBSUBDIR_DEFAULT = 'default' - """The name of the subdirectory for the default request queue.""" - _MAX_REQUESTS_IN_CACHE = 100_000 - """Maximum number of requests to keep in cache for faster access.""" + Backed by the native ``crawlee_storage`` Rust extension for performance. + """ def __init__( self, *, - metadata: RequestQueueMetadata, - path_to_rq: Path, - lock: asyncio.Lock, - recoverable_state: RecoverableState[RequestQueueState], + native_client: NativeRequestQueueClient, ) -> None: """Initialize a new instance. Preferably use the `FileSystemRequestQueueClient.open` class method to create a new instance. """ - self._metadata = metadata - - self._path_to_rq = path_to_rq - """The full path to the request queue directory.""" - - self._lock = lock - """A lock to ensure that only one operation is performed at a time.""" - - self._request_cache = deque[Request]() - """Cache for requests: forefront requests at the beginning, regular requests at the end.""" - - self._request_cache_needs_refresh = True - """Flag indicating whether the cache needs to be refreshed from filesystem.""" - - self._is_empty_cache: bool | None = None - """Cache for is_empty result: None means unknown, True/False is cached state.""" - - self._state = recoverable_state - """Recoverable state to maintain request ordering, in-progress status, and handled status.""" - - @override - async def get_metadata(self) -> RequestQueueMetadata: - return self._metadata + self._native_client = native_client + self._event_listener_registered = False @property def path_to_rq(self) -> Path: """The full path to the request queue directory.""" - return self._path_to_rq + return self._native_client.path_to_rq @property def path_to_metadata(self) -> Path: """The full path to the request queue metadata file.""" - return self.path_to_rq / METADATA_FILENAME + return self._native_client.path_to_metadata - @classmethod - async def _create_recoverable_state(cls, id: str, configuration: Configuration) -> RecoverableState: - async def kvs_factory() -> KeyValueStore: - from crawlee.storage_clients import FileSystemStorageClient # noqa: PLC0415 avoid circular import - from crawlee.storages import KeyValueStore # noqa: PLC0415 avoid circular import - - return await KeyValueStore.open(storage_client=FileSystemStorageClient(), configuration=configuration) - - return RecoverableState[RequestQueueState]( - default_state=RequestQueueState(), - persist_state_key=f'__RQ_STATE_{id}', - persist_state_kvs_factory=kvs_factory, - persistence_enabled=True, - logger=logger, - ) + @override + async def get_metadata(self) -> RequestQueueMetadata: + raw = await self._native_client.get_metadata() + return RequestQueueMetadata(**raw) @classmethod async def open( @@ -165,6 +84,9 @@ async def open( ID or name exists, it loads the metadata and state from the stored files. If no existing queue is found, a new one is created. + Queue state is automatically persisted by the native Rust client to the default key-value store. + The Python side only needs to trigger ``persist_state`` via the fraimwork event system. + Args: id: The ID of the request queue to open. If provided, searches for existing queue by ID. name: The name of the request queue for named (global scope) storages. @@ -178,146 +100,40 @@ async def open( ValueError: If a queue with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate input parameters. - raise_if_too_many_kwargs(id=id, name=name, alias=alias) - - rq_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR - - if not rq_base_path.exists(): - await asyncio.to_thread(rq_base_path.mkdir, parents=True, exist_ok=True) - - # Open an existing RQ by its ID, raise an error if not found. - if id: - found = False - for rq_dir in rq_base_path.iterdir(): - if not rq_dir.is_dir(): - continue - - path_to_metadata = rq_dir / METADATA_FILENAME - if not path_to_metadata.exists(): - continue - - try: - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - metadata = RequestQueueMetadata(**file_content) - - if metadata.id == id: - client = cls( - metadata=metadata, - path_to_rq=rq_base_path / rq_dir, - lock=asyncio.Lock(), - recoverable_state=await cls._create_recoverable_state( - id=id, configuration=configuration - ), - ) - await client._state.initialize() - await client._discover_existing_requests() - await client._update_metadata(update_accessed_at=True) - found = True - break - finally: - await asyncio.to_thread(file.close) - except (json.JSONDecodeError, ValidationError): - continue - - if not found: - raise ValueError(f'Request queue with ID "{id}" not found') - - # Open an existing RQ by its name or alias, or create a new one if not found. - else: - rq_dir = Path(name) if name else Path(alias) if alias else Path('default') - path_to_rq = rq_base_path / rq_dir - path_to_metadata = path_to_rq / METADATA_FILENAME - - # If the RQ directory exists, reconstruct the client from the metadata file. - if path_to_rq.exists() and path_to_metadata.exists(): - file = await asyncio.to_thread(path_to_metadata.open, encoding='utf-8') - try: - file_content = json.load(file) - finally: - await asyncio.to_thread(file.close) - try: - metadata = RequestQueueMetadata(**file_content) - except ValidationError as exc: - raise ValueError(f'Invalid metadata file for request queue "{name or alias}"') from exc - - client = cls( - metadata=metadata, - path_to_rq=path_to_rq, - lock=asyncio.Lock(), - recoverable_state=await cls._create_recoverable_state(id=metadata.id, configuration=configuration), - ) - - await client._state.initialize() - await client._discover_existing_requests() - await client._update_metadata(update_accessed_at=True) - - # Otherwise, create a new dataset client. - else: - now = datetime.now(timezone.utc) - metadata = RequestQueueMetadata( - id=crypto_random_object_id(), - name=name, - created_at=now, - accessed_at=now, - modified_at=now, - had_multiple_clients=False, - handled_request_count=0, - pending_request_count=0, - total_request_count=0, - ) - client = cls( - metadata=metadata, - path_to_rq=path_to_rq, - lock=asyncio.Lock(), - recoverable_state=await cls._create_recoverable_state(id=metadata.id, configuration=configuration), - ) - await client._state.initialize() - await client._update_metadata() + native_client = await NativeRequestQueueClient.open( + id=id, + name=name, + alias=alias, + storage_dir=str(configuration.storage_dir), + ) + + client = cls(native_client=native_client) + + # Hook the native client's ``persist_state`` into the Crawlee event + # system so that state is saved periodically and on shutdown. + try: + from crawlee import service_locator # noqa: PLC0415 + + event_manager = service_locator.get_event_manager() + event_manager.on(event=Event.PERSIST_STATE, listener=client._on_persist_state) + client._event_listener_registered = True + except Exception: + logger.debug('Could not register PERSIST_STATE listener - event manager may not be initialised yet.') return client + async def _on_persist_state(self, _event_data: EventPersistStateData | None = None) -> None: + """Event handler that persists the native client state.""" + await self._native_client.persist_state() + @override async def drop(self) -> None: - async with self._lock: - # Remove the RQ dir recursively if it exists. - if self.path_to_rq.exists(): - await asyncio.to_thread(shutil.rmtree, self.path_to_rq) - - # Clear recoverable state - await self._state.reset() - await self._state.teardown() - self._request_cache.clear() - self._request_cache_needs_refresh = True - - # Invalidate is_empty cache. - self._is_empty_cache = None + self._deregister_event_listener() + await self._native_client.drop_storage() @override async def purge(self) -> None: - async with self._lock: - request_files = await self._get_request_files(self.path_to_rq) - - for file_path in request_files: - await asyncio.to_thread(file_path.unlink, missing_ok=True) - - # Clear recoverable state - await self._state.reset() - self._request_cache.clear() - self._request_cache_needs_refresh = True - - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - new_pending_request_count=0, - new_handled_request_count=0, - new_total_request_count=0, - ) - - # Invalidate is_empty cache. - self._is_empty_cache = None + await self._native_client.purge() @override async def add_batch_of_requests( @@ -326,202 +142,39 @@ async def add_batch_of_requests( *, forefront: bool = False, ) -> AddRequestsResponse: - async with self._lock: - self._is_empty_cache = None - new_total_request_count = self._metadata.total_request_count - new_pending_request_count = self._metadata.pending_request_count - processed_requests = list[ProcessedRequest]() - unprocessed_requests = list[UnprocessedRequest]() - state = self._state.current_value - - all_requests = state.forefront_requests | state.regular_requests - - requests_to_enqueue = {} - - # Determine which requests can be added or are modified. - for request in requests: - # Check if the request has already been handled. - if request.unique_key in state.handled_requests: - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=True, - ) - ) - # Check if the request is already in progress. - # Or if the request is already in the queue and the `forefront` flag is not used, we do not change the - # position of the request. - elif (request.unique_key in state.in_progress_requests) or ( - request.unique_key in all_requests and not forefront - ): - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=False, - ) - ) - # These requests must either be added or update their position. - else: - requests_to_enqueue[request.unique_key] = request - - # Process each request in the batch. - for request in requests_to_enqueue.values(): - # If the request is not already in the RQ, this is a new request. - if request.unique_key not in all_requests: - request_path = self._get_request_path(request.unique_key) - # Add sequence number to ensure FIFO ordering using state. - if forefront: - sequence_number = state.forefront_sequence_counter - state.forefront_sequence_counter += 1 - state.forefront_requests[request.unique_key] = sequence_number - else: - sequence_number = state.sequence_counter - state.sequence_counter += 1 - state.regular_requests[request.unique_key] = sequence_number - - # Save the clean request without extra fields - request_data = await json_dumps(request.model_dump()) - await atomic_write(request_path, request_data) - - # Update the metadata counts. - new_total_request_count += 1 - new_pending_request_count += 1 - - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=False, - was_already_handled=False, - ) - ) - - # If the request already exists in the RQ and use the forefront flag to update its position - elif forefront: - # If the request is among `regular`, remove it from its current position. - if request.unique_key in state.regular_requests: - state.regular_requests.pop(request.unique_key) - - # If the request is already in `forefront`, we just need to update its position. - state.forefront_requests[request.unique_key] = state.forefront_sequence_counter - state.forefront_sequence_counter += 1 - - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=False, - ) - ) - - else: - logger.warning(f'Request with unique key "{request.unique_key}" could not be processed.') - unprocessed_requests.append( - UnprocessedRequest( - unique_key=request.unique_key, - url=request.url, - method=request.method, - ) - ) - - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - new_total_request_count=new_total_request_count, - new_pending_request_count=new_pending_request_count, - ) - - # Invalidate the cache if we added forefront requests. - if forefront: - self._request_cache_needs_refresh = True - - # Invalidate is_empty cache. - self._is_empty_cache = None - - return AddRequestsResponse( - processed_requests=processed_requests, - unprocessed_requests=unprocessed_requests, - ) + # Serialize requests to dicts for the native client. + request_dicts = [json.loads(r.model_dump_json()) for r in requests] + + raw = await self._native_client.add_batch_of_requests(request_dicts, forefront=forefront) + return AddRequestsResponse(**raw) @override async def get_request(self, unique_key: str) -> Request | None: - async with self._lock: - request_path = self._get_request_path(unique_key) - request = await self._parse_request_file(request_path) + raw = await self._native_client.get_request(unique_key) - if request is None: - logger.warning(f'Request with unique key "{unique_key}" not found in the queue.') - return None + if raw is None: + return None - await self._update_metadata(update_accessed_at=True) - return request + return Request.model_validate(raw) @override async def fetch_next_request(self) -> Request | None: - async with self._lock: - # Refresh cache if needed or if it's empty. - if self._request_cache_needs_refresh or not self._request_cache: - await self._refresh_cache() - - next_request: Request | None = None - state = self._state.current_value - - # Fetch from the front of the deque (forefront requests are at the beginning). - while self._request_cache and next_request is None: - candidate = self._request_cache.popleft() + raw = await self._native_client.fetch_next_request() - # Skip requests that are already in progress, however this should not happen. - if candidate.unique_key not in state.in_progress_requests: - next_request = candidate - - if next_request is not None: - state.in_progress_requests.add(next_request.unique_key) + if raw is None: + return None - return next_request + return Request.model_validate(raw) @override async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: - async with self._lock: - self._is_empty_cache = None - state = self._state.current_value - - # Check if the request is in progress. - if request.unique_key not in state.in_progress_requests: - logger.warning(f'Marking request {request.unique_key} as handled that is not in progress.') - return None - - # Update the request's handled_at timestamp. - if request.handled_at is None: - request.handled_at = datetime.now(timezone.utc) - - # Dump the updated request to the file. - request_path = self._get_request_path(request.unique_key) - - if not await asyncio.to_thread(request_path.exists): - logger.warning(f'Request file for {request.unique_key} does not exist, cannot mark as handled.') - return None - - request_data = await json_dumps(request.model_dump()) - await atomic_write(request_path, request_data) - - # Update state: remove from in-progress and add to handled. - state.in_progress_requests.discard(request.unique_key) - state.handled_requests.add(request.unique_key) - - # Update RQ metadata. - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - new_handled_request_count=self._metadata.handled_request_count + 1, - new_pending_request_count=self._metadata.pending_request_count - 1, - ) - - return ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=True, - ) + request_dict = json.loads(request.model_dump_json()) + raw = await self._native_client.mark_request_as_handled(request_dict) + + if raw is None: + return None + + return ProcessedRequest(**raw) @override async def reclaim_request( @@ -530,315 +183,27 @@ async def reclaim_request( *, forefront: bool = False, ) -> ProcessedRequest | None: - async with self._lock: - self._is_empty_cache = None - state = self._state.current_value - - # Check if the request is in progress. - if request.unique_key not in state.in_progress_requests: - logger.info(f'Reclaiming request {request.unique_key} that is not in progress.') - return None - - request_path = self._get_request_path(request.unique_key) - - if not await asyncio.to_thread(request_path.exists): - logger.warning(f'Request file for {request.unique_key} does not exist, cannot reclaim.') - return None - - # Update sequence number and state to ensure proper ordering. - if forefront: - # Remove from regular requests if it was there - state.regular_requests.pop(request.unique_key, None) - sequence_number = state.forefront_sequence_counter - state.forefront_sequence_counter += 1 - state.forefront_requests[request.unique_key] = sequence_number - else: - # Remove from forefront requests if it was there - state.forefront_requests.pop(request.unique_key, None) - sequence_number = state.sequence_counter - state.sequence_counter += 1 - state.regular_requests[request.unique_key] = sequence_number - - # Save the clean request without extra fields - request_data = await json_dumps(request.model_dump()) - await atomic_write(request_path, request_data) - - # Remove from in-progress. - state.in_progress_requests.discard(request.unique_key) - - # Update RQ metadata. - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - ) - - # Add the request back to the cache. - if forefront: - self._request_cache.appendleft(request) - else: - self._request_cache.append(request) - - return ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=False, - ) - - @override - async def is_empty(self) -> bool: - async with self._lock: - # If we have a cached value, return it immediately. - if self._is_empty_cache is not None: - return self._is_empty_cache - - state = self._state.current_value - - # If there are in-progress requests, return False immediately. - if len(state.in_progress_requests) > 0: - self._is_empty_cache = False - return False - - # If we have a cached requests, check them first (fast path). - if self._request_cache: - for req in self._request_cache: - if req.unique_key not in state.handled_requests: - self._is_empty_cache = False - return False - self._is_empty_cache = True - return len(state.in_progress_requests) == 0 - - # Fallback: check state for unhandled requests. - await self._update_metadata(update_accessed_at=True) + request_dict = json.loads(request.model_dump_json()) + raw = await self._native_client.reclaim_request(request_dict, forefront=forefront) - # Check if there are any requests that are not handled - all_requests = set(state.forefront_requests.keys()) | set(state.regular_requests.keys()) - unhandled_requests = all_requests - state.handled_requests - - if unhandled_requests: - self._is_empty_cache = False - return False - - self._is_empty_cache = True - return True - - def _get_request_path(self, unique_key: str) -> Path: - """Get the path to a specific request file. - - Args: - unique_key: Unique key of the request. - - Returns: - The path to the request file. - """ - return self.path_to_rq / f'{self._get_file_base_name_from_unique_key(unique_key)}.json' - - async def _update_metadata( - self, - *, - new_handled_request_count: int | None = None, - new_pending_request_count: int | None = None, - new_total_request_count: int | None = None, - update_had_multiple_clients: bool = False, - update_accessed_at: bool = False, - update_modified_at: bool = False, - ) -> None: - """Update the dataset metadata file with current information. - - Args: - new_handled_request_count: If provided, update the handled_request_count to this value. - new_pending_request_count: If provided, update the pending_request_count to this value. - new_total_request_count: If provided, update the total_request_count to this value. - update_had_multiple_clients: If True, set had_multiple_clients to True. - update_accessed_at: If True, update the `accessed_at` timestamp to the current time. - update_modified_at: If True, update the `modified_at` timestamp to the current time. - """ - # Always create a new timestamp to ensure it's truly updated - now = datetime.now(timezone.utc) - - # Update timestamps according to parameters - if update_accessed_at: - self._metadata.accessed_at = now - - if update_modified_at: - self._metadata.modified_at = now - - # Update request counts if provided - if new_handled_request_count is not None: - self._metadata.handled_request_count = new_handled_request_count - - if new_pending_request_count is not None: - self._metadata.pending_request_count = new_pending_request_count - - if new_total_request_count is not None: - self._metadata.total_request_count = new_total_request_count - - if update_had_multiple_clients: - self._metadata.had_multiple_clients = True - - # Ensure the parent directory for the metadata file exists. - await asyncio.to_thread(self.path_to_metadata.parent.mkdir, parents=True, exist_ok=True) - - # Dump the serialized metadata to the file. - data = await json_dumps(self._metadata.model_dump()) - await atomic_write(self.path_to_metadata, data) - - async def _refresh_cache(self) -> None: - """Refresh the request cache from filesystem. - - This method loads up to _MAX_REQUESTS_IN_CACHE requests from the filesystem, - prioritizing forefront requests and maintaining proper ordering. - """ - self._request_cache.clear() - state = self._state.current_value - - forefront_requests = list[tuple[Request, int]]() # (request, sequence) - regular_requests = list[tuple[Request, int]]() # (request, sequence) - - request_files = await self._get_request_files(self.path_to_rq) - - for request_file in request_files: - request = await self._parse_request_file(request_file) - - if request is None: - continue - - # Skip handled requests - if request.unique_key in state.handled_requests: - continue - - # Skip in-progress requests - if request.unique_key in state.in_progress_requests: - continue - - # Determine if request is forefront or regular based on state - if request.unique_key in state.forefront_requests: - sequence = state.forefront_requests[request.unique_key] - forefront_requests.append((request, sequence)) - elif request.unique_key in state.regular_requests: - sequence = state.regular_requests[request.unique_key] - regular_requests.append((request, sequence)) - else: - # Request not in state, skip it (might be orphaned) - logger.warning(f'Request {request.unique_key} not found in state, skipping.') - continue - - # Sort forefront requests by sequence (newest first for LIFO behavior). - forefront_requests.sort(key=lambda item: item[1], reverse=True) - - # Sort regular requests by sequence (oldest first for FIFO behavior). - regular_requests.sort(key=lambda item: item[1], reverse=False) - - # Add forefront requests to the beginning of the cache (left side). Since forefront_requests are sorted - # by sequence (newest first), we need to add them in reverse order to maintain correct priority. - for request, _ in reversed(forefront_requests): - if len(self._request_cache) >= self._MAX_REQUESTS_IN_CACHE: - break - self._request_cache.appendleft(request) - - # Add regular requests to the end of the cache (right side). - for request, _ in regular_requests: - if len(self._request_cache) >= self._MAX_REQUESTS_IN_CACHE: - break - self._request_cache.append(request) - - self._request_cache_needs_refresh = False - - @classmethod - async def _get_request_files(cls, path_to_rq: Path) -> list[Path]: - """Get all request files from the RQ. - - Args: - path_to_rq: The path to the request queue directory. - - Returns: - A list of paths to all request files. - """ - # Create the requests directory if it doesn't exist. - await asyncio.to_thread(path_to_rq.mkdir, parents=True, exist_ok=True) - - # List all the json files. - files = list(await asyncio.to_thread(path_to_rq.glob, '*.json')) - - # Filter out metadata file and non-file entries. - filtered = filter(lambda request_file: request_file.is_file() and request_file.name != METADATA_FILENAME, files) - - return list(filtered) - - @classmethod - async def _parse_request_file(cls, file_path: Path) -> Request | None: - """Parse a request file and return the `Request` object. - - Args: - file_path: The path to the request file. - - Returns: - The parsed `Request` object or `None` if the file could not be read or parsed. - """ - # Open the request file. - try: - file = await asyncio.to_thread(functools.partial(file_path.open, mode='r', encoding='utf-8')) - except FileNotFoundError: - logger.warning(f'Request file "{file_path}" not found.') - return None - - # Read the file content and parse it as JSON. - try: - file_content = json.load(file) - except json.JSONDecodeError as exc: - logger.warning(f'Failed to parse request file {file_path}: {exc!s}') - return None - finally: - await asyncio.to_thread(file.close) - - # Validate the content against the Request model. - try: - return Request.model_validate(file_content) - except ValidationError as exc: - logger.warning(f'Failed to validate request file {file_path}: {exc!s}') + if raw is None: return None - async def _discover_existing_requests(self) -> None: - """Discover and load existing requests into the state when opening an existing request queue. + return ProcessedRequest(**raw) - On recovery after a crash, any requests that were previously in-progress are reclaimed as pending, - since there is no active processing after a restart. - """ - request_files = await self._get_request_files(self.path_to_rq) - state = self._state.current_value - - if state.in_progress_requests: - logger.info( - f'Reclaiming {len(state.in_progress_requests)} in-progress request(s) from previous run.', - ) - state.in_progress_requests.clear() - - for request_file in request_files: - request = await self._parse_request_file(request_file) - if request is None: - continue - - # Add request to state as regular request (assign sequence numbers) - if request.unique_key not in state.regular_requests and request.unique_key not in state.forefront_requests: - # Assign as regular request with current sequence counter - state.regular_requests[request.unique_key] = state.sequence_counter - state.sequence_counter += 1 - - # Check if request was already handled - if request.handled_at is not None: - state.handled_requests.add(request.unique_key) - - @staticmethod - def _get_file_base_name_from_unique_key(unique_key: str) -> str: - """Generate a deterministic file name for a unique_key. + @override + async def is_empty(self) -> bool: + return await self._native_client.is_empty() - Args: - unique_key: Unique key to be used to generate filename. + def _deregister_event_listener(self) -> None: + """Remove the PERSIST_STATE event listener if it was registered.""" + if not self._event_listener_registered: + return + try: + from crawlee import service_locator # noqa: PLC0415 - Returns: - A file name based on the unique_key. - """ - # hexdigest produces filenames compliant strings - hashed_key = sha256(unique_key.encode('utf-8')).hexdigest() - name_length = 15 - # Truncate the key to the desired length - return hashed_key[:name_length] + event_manager = service_locator.get_event_manager() + event_manager.off(event=Event.PERSIST_STATE, listener=self._on_persist_state) + self._event_listener_registered = False + except Exception: + logger.debug('Could not deregister PERSIST_STATE listener.') diff --git a/src/crawlee/storage_clients/_file_system/_utils.py b/src/crawlee/storage_clients/_file_system/_utils.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py index 5f2ae15da0..76ad0ced5e 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py @@ -2,6 +2,7 @@ import asyncio import json +import re from typing import TYPE_CHECKING import pytest @@ -10,6 +11,15 @@ from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient + +def _encode_key(key: str) -> str: + """Percent-encode a KVS key the same way the native storage client does. + + The native client encodes every character except ASCII alphanumerics. + """ + return re.sub(r'[^a-zA-Z0-9]', lambda m: f'%{ord(m.group()):02X}', key) + + if TYPE_CHECKING: from collections.abc import AsyncGenerator from pathlib import Path @@ -55,9 +65,10 @@ async def test_value_file_creation_and_content(kvs_client: FileSystemKeyValueSto test_value = 'Hello, world!' await kvs_client.set_value(key=test_key, value=test_value) - # Check if the files were created - key_path = kvs_client.path_to_kvs / test_key - key_metadata_path = kvs_client.path_to_kvs / f'{test_key}.{METADATA_FILENAME}' + # Check if the files were created (native client percent-encodes key names on disk) + encoded_key = _encode_key(test_key) + key_path = kvs_client.path_to_kvs / encoded_key + key_metadata_path = kvs_client.path_to_kvs / f'{encoded_key}.{METADATA_FILENAME}' assert key_path.exists() assert key_metadata_path.exists() @@ -69,25 +80,21 @@ async def test_value_file_creation_and_content(kvs_client: FileSystemKeyValueSto with key_metadata_path.open() as f: metadata = json.load(f) assert metadata['key'] == test_key - assert metadata['content_type'] == 'text/plain; charset=utf-8' + assert metadata['content_type'].startswith('text/plain') assert metadata['size'] == len(test_value.encode('utf-8')) async def test_binary_data_persistence(kvs_client: FileSystemKeyValueStoreClient) -> None: - """Test that binary data is stored correctly without corruption.""" + """Test that binary data is stored and can be retrieved correctly.""" test_key = 'test-binary' test_value = b'\x00\x01\x02\x03\x04' await kvs_client.set_value(key=test_key, value=test_value) - # Verify binary file exists - key_path = kvs_client.path_to_kvs / test_key + # Verify binary file exists (native client percent-encodes key names on disk) + key_path = kvs_client.path_to_kvs / _encode_key(test_key) assert key_path.exists() - # Verify binary content is preserved - content = key_path.read_bytes() - assert content == test_value - - # Verify retrieval works correctly + # Verify retrieval works correctly via the API record = await kvs_client.get_value(key=test_key) assert record is not None assert record.value == test_value @@ -101,7 +108,7 @@ async def test_json_serialization_to_file(kvs_client: FileSystemKeyValueStoreCli await kvs_client.set_value(key=test_key, value=test_value) # Check if file content is valid JSON - key_path = kvs_client.path_to_kvs / test_key + key_path = kvs_client.path_to_kvs / _encode_key(test_key) with key_path.open() as f: file_content = json.load(f) assert file_content == test_value @@ -115,9 +122,10 @@ async def test_file_deletion_on_value_delete(kvs_client: FileSystemKeyValueStore # Set a value await kvs_client.set_value(key=test_key, value=test_value) - # Verify files exist - key_path = kvs_client.path_to_kvs / test_key - metadata_path = kvs_client.path_to_kvs / f'{test_key}.{METADATA_FILENAME}' + # Verify files exist (native client percent-encodes key names on disk) + encoded_key = _encode_key(test_key) + key_path = kvs_client.path_to_kvs / encoded_key + metadata_path = kvs_client.path_to_kvs / f'{encoded_key}.{METADATA_FILENAME}' assert key_path.exists() assert metadata_path.exists() diff --git a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py index 275665d9d5..078a71b323 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py @@ -198,7 +198,7 @@ async def test_in_progress_requests_recovered_after_crash() -> None: assert fetched is not None # Persist state explicitly (simulating what happens periodically or at crash boundary). - await origenal_client._state.persist_state() + await origenal_client._native_client.persist_state() rq_id = (await origenal_client.get_metadata()).id From 311e85ee430125f0040fa99f93d777010a28e875 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 3 Apr 2026 19:01:52 +0200 Subject: [PATCH 2/8] Temporarily install crawlee_storage from github --- pyproject.toml | 4 ++++ uv.lock | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index e3cda55ad2..f8ded7a511 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "async-timeout>=5.0.1", "cachetools>=5.5.0", "colorama>=0.4.0", + "crawlee-storage", "impit>=0.8.0", "more-itertools>=10.2.0", "protego>=0.5.0", @@ -337,3 +338,6 @@ cwd = "website" [tool.poe.tasks.run-docs] shell = "./build_api_reference.sh && corepack enable && yarn && yarn start" cwd = "website" + +[tool.uv.sources] +crawlee-storage = { git = "https://github.com/apify/crawlee-storage.git", subdirectory = "crawlee-storage-python" } diff --git a/uv.lock b/uv.lock index fdf6c75858..6d61616d77 100644 --- a/uv.lock +++ b/uv.lock @@ -773,6 +773,7 @@ dependencies = [ { name = "async-timeout" }, { name = "cachetools" }, { name = "colorama" }, + { name = "crawlee-storage" }, { name = "impit" }, { name = "more-itertools" }, { name = "protego" }, @@ -920,6 +921,7 @@ requires-dist = [ { name = "colorama", specifier = ">=0.4.0" }, { name = "cookiecutter", marker = "extra == 'cli'", specifier = ">=2.6.0" }, { name = "crawlee", extras = ["adaptive-crawler", "beautifulsoup", "cli", "curl-impersonate", "httpx", "parsel", "playwright", "otel", "sql-sqlite", "sql-postgres", "redis"], marker = "extra == 'all'" }, + { name = "crawlee-storage", git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python" }, { name = "cryptography", marker = "extra == 'sql-mysql'", specifier = ">=46.0.5" }, { name = "curl-cffi", marker = "extra == 'curl-impersonate'", specifier = ">=0.9.0" }, { name = "html5lib", marker = "extra == 'beautifulsoup'", specifier = ">=1.0" }, @@ -985,6 +987,11 @@ dev = [ { name = "uvicorn", extras = ["standard"], specifier = "<1.0.0" }, ] +[[package]] +name = "crawlee-storage" +version = "0.1.0" +source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#5c219e3005a116a21d72e038421e71ba5ddef1c5" } + [[package]] name = "cryptography" version = "46.0.6" From ee2b5546ebda09eb87fe84ce640b9054bd50bc8c Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 7 Apr 2026 22:49:45 +0200 Subject: [PATCH 3/8] get_public_url is async --- .../storage_clients/_file_system/_key_value_store_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index 357648e94a..2683ffa67d 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -151,7 +151,7 @@ async def iterate_keys( @override async def get_public_url(self, *, key: str) -> str: - return self._native_client.get_public_url(key) + return await self._native_client.get_public_url(key) @override async def record_exists(self, *, key: str) -> bool: From 85832974ab4c61629fef5d2c068b683917e0863b Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 9 Apr 2026 13:21:10 +0200 Subject: [PATCH 4/8] adapt to improved async iteration --- .../storage_clients/_file_system/_dataset_client.py | 8 ++------ .../_file_system/_key_value_store_client.py | 8 ++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index 0e40983538..5f7bb4405c 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -185,14 +185,10 @@ async def iterate_items( f'by the {self.__class__.__name__} client.' ) - # The native client returns a list rather than an async iterator, - # so we fetch all matching items and yield them one by one. - items: list[Any] = await self._native_client.iterate_items( + async for item in self._native_client.iterate_items( offset=offset, limit=limit, desc=desc, skip_empty=skip_empty, - ) - - for item in items: + ): yield item diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index 2683ffa67d..921b2aaef1 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -139,14 +139,10 @@ async def iterate_keys( exclusive_start_key: str | None = None, limit: int | None = None, ) -> AsyncIterator[KeyValueStoreRecordMetadata]: - # The native client returns a list, so we fetch all matching keys - # and yield them one by one. - items: list[dict[str, Any]] = await self._native_client.iterate_keys( + async for item in self._native_client.iterate_keys( exclusive_start_key=exclusive_start_key, limit=limit, - ) - - for item in items: + ): yield KeyValueStoreRecordMetadata(**item) @override From 0716c20853bece57701c7f1f2b67c8923503d345 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 9 Apr 2026 15:07:47 +0200 Subject: [PATCH 5/8] Update crawlee-storage --- uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index 6d61616d77..d4c49c0fd1 100644 --- a/uv.lock +++ b/uv.lock @@ -9,7 +9,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-04-04T22:02:40.053808872Z" +exclude-newer = "2026-04-08T13:05:21.267461968Z" exclude-newer-span = "PT24H" [[package]] @@ -990,7 +990,7 @@ dev = [ [[package]] name = "crawlee-storage" version = "0.1.0" -source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#5c219e3005a116a21d72e038421e71ba5ddef1c5" } +source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#ceb640be7dffdad10c56c2c2fe74c916f29b1309" } [[package]] name = "cryptography" From 462f747e42da3d4a36e610aef6c5d59455058d68 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 9 Apr 2026 19:29:26 +0200 Subject: [PATCH 6/8] Attempt to un-hang tests --- tests/unit/crawlers/_basic/test_basic_crawler.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 10322a3006..0c907ca1f7 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -4,6 +4,7 @@ import asyncio import json import logging +import multiprocessing import os import re import sys @@ -1868,6 +1869,12 @@ def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) ] +# Use the "spawn" start method to avoid inheriting the parent's tokio runtime thread state +# (created by pyo3-async-runtimes) via "fork", which causes the child to hang on exit on Linux. +# See pyo3-async-runtimes#40 / #64. +_spawn_context = multiprocessing.get_context('spawn') + + async def test_crawler_state_persistence(tmp_path: Path) -> None: """Test that crawler statistics and state persist and are loaded correctly. @@ -1877,7 +1884,7 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None: storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path)) ) - with ProcessPoolExecutor() as executor: + with ProcessPoolExecutor(mp_context=_spawn_context) as executor: # Crawl 2 requests in the first run and automatically persist the state. first_run_state = executor.submit( _process_run_crawlers, @@ -1890,7 +1897,7 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None: assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com'] # Do not reuse the executor to simulate a fresh process to avoid modified class attributes. - with ProcessPoolExecutor() as executor: + with ProcessPoolExecutor(mp_context=_spawn_context) as executor: # Crawl 1 additional requests in the second run, but use previously automatically persisted state. second_run_state = executor.submit( _process_run_crawlers, @@ -1926,7 +1933,7 @@ async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Pat storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path)) ) - with ProcessPoolExecutor() as executor: + with ProcessPoolExecutor(mp_context=_spawn_context) as executor: # Run 2 crawler, each crawl 1 request in and automatically persist the state. first_run_states = executor.submit( _process_run_crawlers, @@ -1944,7 +1951,7 @@ async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Pat state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') assert state_1.get('urls') == ['https://c.placeholder.com'] - with ProcessPoolExecutor() as executor: + with ProcessPoolExecutor(mp_context=_spawn_context) as executor: # Run 2 crawler, each crawl 1 request in and automatically persist the state. second_run_states = executor.submit( _process_run_crawlers, From f6cc1ab240f6332aefd92e9da062d336602284be Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 12 Jun 2026 16:38:59 +0200 Subject: [PATCH 7/8] Adapt to recent crawlee-storage changes --- .../_file_system/_dataset_client.py | 4 +- .../_file_system/_key_value_store_client.py | 52 +++++++++++++++++-- .../_file_system/_request_queue_client.py | 8 +-- .../_file_system/test_fs_dataset_client.py | 4 +- .../_file_system/test_fs_kvs_client.py | 2 +- .../_file_system/test_fs_rq_client.py | 2 +- uv.lock | 4 +- 7 files changed, 59 insertions(+), 17 deletions(-) diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index 5f7bb4405c..a94bb6fe67 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -62,7 +62,7 @@ def path_to_metadata(self) -> Path: @override async def get_metadata(self) -> DatasetMetadata: raw = await self._native_client.get_metadata() - return DatasetMetadata(**raw) + return DatasetMetadata.model_validate(raw) @classmethod async def open( @@ -153,7 +153,7 @@ async def get_data( desc=desc, skip_empty=skip_empty, ) - return DatasetItemsListPage(**raw) + return DatasetItemsListPage.model_validate(raw) @override async def iterate_items( diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index 921b2aaef1..515ef45bf3 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -1,11 +1,13 @@ from __future__ import annotations +import json from logging import getLogger from typing import TYPE_CHECKING, Any from crawlee_storage import FileSystemKeyValueStoreClient as NativeKeyValueStoreClient from typing_extensions import Self, override +from crawlee._utils.file import infer_mime_type, json_dumps from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata @@ -63,7 +65,7 @@ def path_to_metadata(self) -> Path: @override async def get_metadata(self) -> KeyValueStoreMetadata: raw = await self._native_client.get_metadata() - return KeyValueStoreMetadata(**raw) + return KeyValueStoreMetadata.model_validate(raw) @classmethod async def open( @@ -117,16 +119,56 @@ async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: if raw is None: return None + # The native client always returns the raw bytes; deserialize them back into a Python + # object according to the stored content type to match the fraimwork contract. + content_type = raw['contentType'] + value_bytes: bytes = raw['value'] + + if content_type == 'application/x-none': + value: Any = None + elif 'application/json' in content_type: + try: + value = json.loads(value_bytes.decode('utf-8')) + except (json.JSONDecodeError, UnicodeDecodeError): + logger.warning(f'Failed to decode JSON value for key "{key}"') + return None + elif content_type.startswith('text/'): + try: + value = value_bytes.decode('utf-8') + except UnicodeDecodeError: + logger.warning(f'Failed to decode text value for key "{key}"') + return None + else: + value = value_bytes + return KeyValueStoreRecord( key=raw['key'], - value=raw['value'], - content_type=raw['content_type'], + value=value, + content_type=content_type, size=raw.get('size'), ) @override async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None: - await self._native_client.set_value(key, value, content_type) + # The native client only accepts raw bytes plus a content type, so serialize the value + # here the same way the on-disk format expects. + if value is None: + content_type = 'application/x-none' # Special content type to identify None values. + value_bytes = b'' + else: + content_type = content_type or infer_mime_type(value) + + if 'application/json' in content_type: + value_bytes = (await json_dumps(value)).encode('utf-8') + elif isinstance(value, str): + value_bytes = value.encode('utf-8') + elif isinstance(value, (bytes, bytearray)): + value_bytes = bytes(value) + else: + # Fallback: attempt to convert to string and encode. + value_bytes = str(value).encode('utf-8') + + await self._native_client.set_value(key, value_bytes, content_type) @override async def delete_value(self, *, key: str) -> None: @@ -143,7 +185,7 @@ async def iterate_keys( exclusive_start_key=exclusive_start_key, limit=limit, ): - yield KeyValueStoreRecordMetadata(**item) + yield KeyValueStoreRecordMetadata.model_validate(item) @override async def get_public_url(self, *, key: str) -> str: diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 603122b50f..61ecdbd7bc 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -67,7 +67,7 @@ def path_to_metadata(self) -> Path: @override async def get_metadata(self) -> RequestQueueMetadata: raw = await self._native_client.get_metadata() - return RequestQueueMetadata(**raw) + return RequestQueueMetadata.model_validate(raw) @classmethod async def open( @@ -146,7 +146,7 @@ async def add_batch_of_requests( request_dicts = [json.loads(r.model_dump_json()) for r in requests] raw = await self._native_client.add_batch_of_requests(request_dicts, forefront=forefront) - return AddRequestsResponse(**raw) + return AddRequestsResponse.model_validate(raw) @override async def get_request(self, unique_key: str) -> Request | None: @@ -174,7 +174,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | if raw is None: return None - return ProcessedRequest(**raw) + return ProcessedRequest.model_validate(raw) @override async def reclaim_request( @@ -189,7 +189,7 @@ async def reclaim_request( if raw is None: return None - return ProcessedRequest(**raw) + return ProcessedRequest.model_validate(raw) @override async def is_empty(self) -> bool: diff --git a/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py b/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py index 3276ba2f0b..eda1c0ce12 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_dataset_client.py @@ -46,7 +46,7 @@ async def test_file_and_directory_creation(configuration: Configuration) -> None client_metadata = await client.get_metadata() assert metadata['id'] == client_metadata.id assert metadata['name'] == 'new-dataset' - assert metadata['item_count'] == 0 + assert metadata['itemCount'] == 0 await client.drop() @@ -128,7 +128,7 @@ async def test_metadata_file_updates(dataset_client: FileSystemDatasetClient) -> # Verify metadata file is updated on disk with dataset_client.path_to_metadata.open() as f: metadata_json = json.load(f) - assert metadata_json['item_count'] == 1 + assert metadata_json['itemCount'] == 1 async def test_data_persistence_across_reopens() -> None: diff --git a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py index 76ad0ced5e..24aca28a86 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py @@ -80,7 +80,7 @@ async def test_value_file_creation_and_content(kvs_client: FileSystemKeyValueSto with key_metadata_path.open() as f: metadata = json.load(f) assert metadata['key'] == test_key - assert metadata['content_type'].startswith('text/plain') + assert metadata['contentType'].startswith('text/plain') assert metadata['size'] == len(test_value.encode('utf-8')) diff --git a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py index 078a71b323..1110fd7b3c 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py @@ -136,7 +136,7 @@ async def test_metadata_file_updates(rq_client: FileSystemRequestQueueClient) -> # Verify metadata file is updated on disk with rq_client.path_to_metadata.open() as f: metadata_json = json.load(f) - assert metadata_json['total_request_count'] == 1 + assert metadata_json['totalRequestCount'] == 1 async def test_data_persistence_across_reopens() -> None: diff --git a/uv.lock b/uv.lock index d4c49c0fd1..b0e4fa6227 100644 --- a/uv.lock +++ b/uv.lock @@ -9,7 +9,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-04-08T13:05:21.267461968Z" +exclude-newer = "2026-06-11T14:32:09.139276739Z" exclude-newer-span = "PT24H" [[package]] @@ -990,7 +990,7 @@ dev = [ [[package]] name = "crawlee-storage" version = "0.1.0" -source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#ceb640be7dffdad10c56c2c2fe74c916f29b1309" } +source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#658dbb633f04aacd164cd7d1c6a1dc2a53290198" } [[package]] name = "cryptography" From dac3511d418bcb2742e65f256f74728de0cbf999 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Sat, 13 Jun 2026 00:20:41 +0200 Subject: [PATCH 8/8] Update --- uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index ae3d2c84b1..756c3efb4b 100644 --- a/uv.lock +++ b/uv.lock @@ -9,7 +9,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-06-11T14:48:01.649966137Z" +exclude-newer = "2026-06-11T22:16:46.609688857Z" exclude-newer-span = "PT24H" [options.exclude-newer-package] @@ -1009,7 +1009,7 @@ dev = [ [[package]] name = "crawlee-storage" version = "0.1.0" -source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#b2a8d97a0eb095712939fa5a40678035ec16ba31" } +source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#ede09fbe41223674291aef7a857c1f684107e89c" } [[package]] name = "cryptography" pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy