Installation

pickleDB is available on PyPI. Install it with pip in just one command:

pip install pickledb

Your first pickleDB

Open up your favorite terminal and start a Python shell. Enter the following commands and watch pickleDB go to work!

from pickledb import PickleDB

db = PickleDB('my_database.db')  # Initialize the database

db.set('greeting', 'Hello, world!')  # Add a key-value pair

db.get('greeting')  # Retrieve the value. Output: 'Hello, world!'

db.save()  # Save the data to disk

You should now have a file called my_database.py in your working directory. In it you should see your key-value pair you just added. It’s that simple! In just a few lines, you have a fully functioning key-value store.

Add or update a key-value pair

We can add key-value pairs to our database with the set(key, value) method provided in the PickleDB class for synchronous operations:

# Add a new key-value pair
db.set('username', 'admin')

# Or shorthand
db['username'] = 'admin'

# Update an existing key-value pair
db.set('username', 'superadmin')
db.get('username')  # Output: 'superadmin'

Your key must be a string. If it is not, pickleDB will convert it into one for you using str(). Your value can be any JSON serializable object.

Retrieve the value associated with a key

In order to get the value of a key we can use PickleDB's get(key) method. Remember your key must be a string.

# Get the value for a key
print(db.get('username'))  # Output: 'superadmin'

# Like the set() method, you can use Python syntax sugar here as well
db['username']  # Output: 'superadmin'

# Attempt to retrieve a non-existent key
db.get('nonexistent_key')  # Output: None

Get a list of all keys

It is useful to get a list of all the keys in your database for many reasons. To do this we can use the all() method for this:

# Add multiple keys
db.set('item1', 'value1')
db.set('item2', 'value2')

# Retrieve all keys
db.all()  # Output: ['username', 'item1', 'item2']

Delete a key and its value

To delete a key from the database you can use the remove(key) method. This will delete the key and its associated value.

# Remove a key-value pair
db.remove('item1')
db.all()  # Output: ['username', 'item2']

As you can see, when we call the all() method now, it no longer shows the key we removed.

Delete all keys

You can delete all data in the database using the purge() method in the PickleDB class.

db.purge()

db.all()  # Output: []

Persist the database to disk

In order for any of these other methods to be permanent you must call the save() method. Saving to disk only happens when this method is called. If you add a bunch of key-value pairs to your database and the program exits without this command being called all data will be lost! The explicit nature lets you tune the performance of pickleDB to your needs.

db.save()  # Output: True

You can also save the database using a context manager:

with db:
    db.set('foo', 'bar')
    db.set('hello', 'world')
    db.set('candy', 'bad')

# Database saved when the `with` statement exits.

The AsyncPickleDB Class

While the PickleDB is great for simple and fast data storage, it is not thread safe or asynchronous. So when you are working with things like ASGI frameworks the standard PickleDB class is ill-suited. When you need async you need to use the AsyncPickleDB class. This class operates close to the same as we are used to. Let's take a look:

import asyncio
from pickledb import AsyncPickleDB

async def main():
    # Initialize the AsyncPickleDB object
    db = AsyncPickleDB('my_database.db')

    # Add a key-value pair
    await db.aset('greeting', 'Hello, world!')

    # Retrieve the value
    greeting = await db.aget('greeting')
    print(greeting)  # Output: 'Hello, world!'

    # Save the data to disk
    await db.asave()

asyncio.run(main())

To use AsyncPickleDB you can simply add the await keyword and an "a" to the beginning of the synchronous commands you are used to:

await db.aset('foo', 'bar')

await db.aget('foo')

await db.aremove('foo')

await db.aall()

await db.apurge()

await db.asave()

Store and Retrieve Complex Data

PickleDB works seamlessly with Python data structures. Example:

# Store a dictionary
db.set('user', {'name': 'Alice', 'age': 30, 'city': 'Wonderland'})

# Retrieve and update it
user = db.get('user')
user['age'] += 1

# Save the updated data
db.set('user', user)
print(db.get('user'))  # Output: {'name': 'Alice', 'age': 31, 'city': 'Wonderland'}

Use Lists for Dynamic Data

Handle lists with ease:

# Add a list of items
db.set('tasks', ['Write code', 'Test app', 'Deploy'])

# Retrieve and modify
tasks = db.get('tasks')
tasks.append('Celebrate')
db.set('tasks', tasks)

print(db.get('tasks'))  # Output: ['Write code', 'Test app', 'Deploy', 'Celebrate']

Advanced Key Search

Search the database with the full power of Python:

# Create simple helper methods based on what YOU need
def get_keys_with_match_list(db_instance, match):
    return [key for key in db_instance.all() if match in key]

def get_keys_with_match_dict(db_instance, match):
    return dict(filter(lambda item: match in item[0], db_instance.db.items()))

# Create an instance of PickleDB
db = PickleDB("example.json")

# Add key-value pairs
db.set('apple', 1)
db.set('apricot', 2)
db.set('banana', 3)

# Use glob search to return a list
matching_keys = get_keys_with_match_list(db, 'ap')
print(matching_keys)  # Output: ['apple', 'apricot']

# Use glob search to return a dict
matching_dict = get_keys_with_match_dict(db, 'ap')
print(matching_dict)  # Output: {"apple": 1, "apricot": 3}

Namespace Support

If you use prefixes to simulate namespaces, you can manage groups of keys more efficiently:

# Set multiple keys with a namespace
db.set('user:1', {'name': 'Alice', 'age': 30})
db.set('user:2', {'name': 'Bob', 'age': 25})

# Get all keys in a namespace
def get_namespace_keys(db_instance, namespace):
    return [key for key in db_instance.all() if key.startswith(f"{namespace}:")]

user_keys = get_namespace_keys(db, 'user')
print(user_keys)  # Output: ['user:1', 'user:2']

Expire Keys

Manually simulate a basic TTL (time-to-live) mechanism for expiring keys:

import time

# Set a key with an expiration time
def set_with_expiry(db_instance, key, value, ttl):
    db_instance.set(key, {'value': value, 'expires_at': time.time() + ttl})

# Get a key only if it hasn't expired
def get_if_not_expired(db_instance, key):
    data = db_instance.get(key)
    if data and time.time() < data['expires_at']:
        return data['value']
    db_instance.remove(key)  # Remove expired key
    return None

# Example usage
set_with_expiry(db, 'session_123', 'active', ttl=10)
time.sleep(5)
print(get_if_not_expired(db, 'session_123'))  # Output: 'active'
time.sleep(6)
print(get_if_not_expired(db, 'session_123'))  # Output: None

Encrypted Storage

Use encryption for secure storage of sensitive data:

from cryptography.fernet import Fernet

# Initialize encryption
key = Fernet.generate_key()
cipher = Fernet(key)

# Encrypt and save data
encrypted_value = cipher.encrypt(b"My secret data")
db.set('secure_key', encrypted_value)

# Retrieve and decrypt data
encrypted_value = db.get('secure_key')
decrypted_value = cipher.decrypt(encrypted_value)
print(decrypted_value.decode())  # Output: My secret data

Batch Operations

Add multiple key-value pairs in a single operation:

def batch_set(db_instance, items):
    for key, value in items.items():
        db_instance.set(key, value)

# Add multiple keys
batch_set(db, {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
print(db.all())  # Output: ['key1', 'key2', 'key3']

# Delete multiple key-value pairs in a single operation:
def batch_delete(db_instance, keys):
    for key in keys:
        db_instance.remove(key)

# Example usage
db.set('temp1', 'value1')
db.set('temp2', 'value2')
batch_delete(db, ['temp1', 'temp2'])
print(db.all())  # Output: []

Key Pattern Matching

Support complex matching patterns using regular expressions:

import re

# Get keys that match a regex pattern
def get_keys_by_pattern(db_instance, pattern):
    regex = re.compile(pattern)
    return [key for key in db_instance.all() if regex.search(key)]

# Example usage
db.set('user:1', {'name': 'Alice'})
db.set('user:2', {'name': 'Bob'})
db.set('admin:1', {'name': 'Charlie'})
matching_keys = get_keys_by_pattern(db, r'user:\d')
print(matching_keys)  # Output: ['user:1', 'user:2']

Adding Custom Signal Handling

You can easily implement custom signal handling in your application to ensure graceful shutdowns and data persistence during unexpected terminations. Below is an example of how to integrate custom signal handling with pickleDB:

import signal
import sys
from pickledb import PickleDB

# Initialize the PickleDB instance
db = PickleDB('my_database.db')

# Register signal handlers for SIGINT (Ctrl+C) and SIGTERM (system termination)
signal.signal(signal.SIGINT, lambda signum, frame: (db.save(), sys.exit(0)))
signal.signal(signal.SIGTERM, lambda signum, frame: (db.save(), sys.exit(0)))

# Example usage
db.set('key1', 'value1')
db.set('key2', 'value2')

print("Database is running. Press Ctrl+C to save and exit.")

# Keep the program running to allow signal handling
try:
    while True:
        pass
except KeyboardInterrupt:
    pass

Async For Web Frameworks

For frameworks like FastAPI, Starlette, or MicroPie, use AsyncPickleDB to handle requests without blocking the server:

from uuid import uuid4
import asyncio
from MicroPie import App
from markupsafe import escape
from pickledb import AsyncPickleDB

db = AsyncPickleDB('pastes.db')

class Root(App):

    async def index(self):
        if self.request.method == "POST":
            paste_content = self.request.body_params.get('paste_content', [''])[0]
            pid = str(uuid4())
            await db.aset(pid, escape(paste_content))
            await db.asave()
            return self._redirect(f'/paste/{pid}')
        return await self._render_template('index.html')

    async def paste(self, paste_id, delete=None):
        if delete == 'delete':
            await db.aremove(paste_id)
            await db.asave()
            return self._redirect('/')
        paste_content = await db.aget(paste_id)
        return await self._render_template(
            'paste.html',
            paste_id=paste_id,
            paste_content=paste_content
        )

app = Root()
Fork me on GitHub