Source code for requests_cache.backends.dynamodb

"""DynamoDB cache backend. For usage details, see :ref:`Backends: DynamoDB <dynamodb>`.

.. automodsumm:: requests_cache.backends.dynamodb
   :classes-only:
   :nosignatures:
"""
from typing import Iterable, Optional

import boto3
from boto3.dynamodb.types import Binary
from boto3.resources.base import ServiceResource
from botocore.exceptions import ClientError

from requests_cache.backends.base import VT

from .._utils import get_valid_kwargs
from ..serializers import SerializerType, dynamodb_document_serializer
from . import BaseCache, BaseStorage, DictStorage


[docs]class DynamoDbCache(BaseCache): """DynamoDB cache backend. By default, responses are only partially serialized into a DynamoDB-compatible document format. Args: table_name: DynamoDB table name connection: :boto3:`DynamoDB Resource <services/dynamodb/service-resource/index.html#service-resource>` object to use instead of creating a new one ttl: Use DynamoDB TTL to automatically remove expired items kwargs: Additional keyword arguments for :py:meth:`~boto3.session.Session.resource` """ def __init__( self, table_name: str = 'http_cache', *, ttl: bool = True, connection: Optional[ServiceResource] = None, decode_content: bool = True, serializer: Optional[SerializerType] = None, **kwargs, ): super().__init__(cache_name=table_name, **kwargs) skwargs = {'serializer': serializer, **kwargs} if serializer else kwargs self.responses = DynamoDbDict( table_name, ttl=ttl, connection=connection, decode_content=decode_content, **skwargs, ) # Redirects will be only stored in memory and not persisted self.redirects: BaseStorage[str, str] = DictStorage()
[docs]class DynamoDbDict(BaseStorage): """A dictionary-like interface for DynamoDB table Args: table_name: DynamoDB table name connection: :boto3:`DynamoDB Resource <services/dynamodb/service-resource/index.html#service-resource>` object to use instead of creating a new one ttl: Use DynamoDB TTL to automatically remove expired items kwargs: Additional keyword arguments for :py:meth:`~boto3.session.Session.resource` """ def __init__( self, table_name: str, ttl: bool = True, connection: Optional[ServiceResource] = None, serializer: Optional[SerializerType] = dynamodb_document_serializer, **kwargs, ): super().__init__(serializer=serializer, **kwargs) connection_kwargs = get_valid_kwargs( boto3.Session.__init__, kwargs, extras=['endpoint_url'] ) self.connection = connection or boto3.resource('dynamodb', **connection_kwargs) self.table_name = table_name self.ttl = ttl self._table = self.connection.Table(self.table_name) self._create_table() if ttl: self._enable_ttl() def _create_table(self): """Create a default table if one does not already exist""" try: self.connection.create_table( AttributeDefinitions=[ {'AttributeName': 'key', 'AttributeType': 'S'}, ], TableName=self.table_name, KeySchema=[ {'AttributeName': 'key', 'KeyType': 'HASH'}, ], BillingMode='PAY_PER_REQUEST', ) self._table.wait_until_exists() # Ignore error if table already exists except ClientError as e: if e.response['Error']['Code'] != 'ResourceInUseException': raise def _enable_ttl(self): """Enable TTL, if not already enabled""" try: self.connection.meta.client.update_time_to_live( TableName=self.table_name, TimeToLiveSpecification={'AttributeName': 'ttl', 'Enabled': True}, ) # Ignore error if TTL is already enabled except ClientError as e: if e.response['Error']['Code'] != 'ValidationException': raise def __getitem__(self, key): result = self._table.get_item(Key={'key': key}) if 'Item' not in result: raise KeyError return self.deserialize(key, result['Item']['value']) def __setitem__(self, key, value): item = {'key': key, 'value': self.serialize(value)} # If enabled, set TTL value as a timestamp in unix format if self.ttl and getattr(value, 'expires_unix', None): item['ttl'] = value.expires_unix self._table.put_item(Item=item) def __delitem__(self, key): response = self._table.delete_item(Key={'key': key}, ReturnValues='ALL_OLD') if 'Attributes' not in response: raise KeyError def __iter__(self): # Alias 'key' attribute since it's a reserved keyword results = self._table.scan( ProjectionExpression='#k', ExpressionAttributeNames={'#k': 'key'}, ) for item in results['Items']: yield item['key'] def __len__(self): """Get the number of items in the table. **Note:** This is an estimate, and is updated every 6 hours. A full table scan will use up your provisioned throughput, so it's not recommended. """ return self._table.item_count
[docs] def bulk_delete(self, keys: Iterable[str]): """Delete multiple keys from the cache. Does not raise errors for missing keys.""" with self._table.batch_writer() as batch: for key in keys: batch.delete_item(Key={'key': key})
[docs] def clear(self): self.bulk_delete((k for k in self))
[docs] def deserialize(self, key, value: VT): """Handle Binary objects from a custom serializer""" serialized_value = value.value if isinstance(value, Binary) else value return super().deserialize(key, serialized_value)
# TODO: Support pagination
[docs] def values(self): for item in self._table.scan()['Items']: yield self.deserialize(item['key'], item['value'])