Skip to main content

Sets and availabilty

Step 1

Reformat plannedequipment into reservations per date

Processes equipment reservation data.

  • Reads reservation records from JSON files in the "plannedequipment" directory.
  • Reads equipment properties from JSON files in the "equipment" directory.

Calculates daily reservation totals for each equipment item.

  • Covers a date range: current month plus a configurable number of months (default 9).
  • Organizes data into a nested structure: year → month → day.

Computes daily available quantity: current quantity minus reservations.
Outputs results to JSON files in the "reservationperdate" directory.
Offers optional features:

  • Suppression of days with zero reservations.
  • Prevention of file creation when no reservation data exists.

Script

import os
import json
from datetime import datetime, date, timedelta
import calendar

# Parameters
SUPPRESS_ZERO = True           # If True, skip storing day nodes with zero reservations.
SUPPRESS_EMPTY_FILE = True     # If True, do not create an output file if the "dates" part is empty.
ADDITIONAL_MONTHS = 9          # Number of additional months to process (beyond the current month)

# Define the directories
plannedequipment_dir = 'plannedequipment'
equipment_dir = 'equipment'
output_dir = 'reservationperdate'
os.makedirs(output_dir, exist_ok=True)

# Define the date range: from the first day of this month to the last day of the month ADDITIONAL_MONTHS later.
today = date.today()
start_date = date(today.year, today.month, 1)
end_month = today.month + ADDITIONAL_MONTHS
end_year = today.year + (end_month - 1) // 12
end_month = ((end_month - 1) % 12) + 1
last_day = calendar.monthrange(end_year, end_month)[1]
end_date = date(end_year, end_month, last_day)

# Define the list of equipment property keys to copy from the equipment JSON files.
equipment_keys = ["is_combination", "is_physical", "current_quantity"]

def add_day(nested_dates, current_date):
    """Ensure that nested_dates[year][month][day] exists and initialize it."""
    year = str(current_date.year)
    month = f"{current_date.month:02d}"
    day = f"{current_date.day:02d}"
    
    if year not in nested_dates:
        nested_dates[year] = {}
    if month not in nested_dates[year]:
        nested_dates[year][month] = {}
    if day not in nested_dates[year][month]:
        nested_dates[year][month][day] = {"reservations": 0, "available": None}

def remove_empty_containers(nested_dates):
    """Remove empty month and year nodes if they have no day entries."""
    years_to_remove = []
    for year, months in nested_dates.items():
        months_to_remove = []
        for month, days in months.items():
            if not days:  # If month dictionary is empty
                months_to_remove.append(month)
        for month in months_to_remove:
            del nested_dates[year][month]
        if not nested_dates[year]:
            years_to_remove.append(year)
    for year in years_to_remove:
        del nested_dates[year]

# Process each JSON file in the plannedequipment directory.
for filename in os.listdir(plannedequipment_dir):
    if not filename.endswith('.json'):
        continue

    # Build full paths for plannedequipment, equipment and output files.
    plan_file_path = os.path.join(plannedequipment_dir, filename)
    equipment_file_path = os.path.join(equipment_dir, filename)
    output_path = os.path.join(output_dir, filename)

    # Load the reservation data from the plannedequipment file.
    with open(plan_file_path, 'r', encoding='utf-8') as infile:
        plan_data = json.load(infile)

    # Initialize the nested dates dictionary.
    nested_dates = {}
    current_date = start_date
    while current_date <= end_date:
        add_day(nested_dates, current_date)
        current_date += timedelta(days=1)

    # Process each reservation record.
    for record in plan_data.get('data', []):
        start_str = record.get('planperiod_start')
        end_str = record.get('planperiod_end')
        if not start_str or not end_str:
            continue

        record_start = datetime.fromisoformat(start_str).date()
        record_end = datetime.fromisoformat(end_str).date()
        # Clamp the reservation period to our target range.
        period_start = max(record_start, start_date)
        period_end = min(record_end, end_date)
        if period_start > period_end:
            continue

        quantity = record.get('quantity', 0)
        current = period_start
        while current <= period_end:
            year = str(current.year)
            month = f"{current.month:02d}"
            day = f"{current.day:02d}"
            if year in nested_dates and month in nested_dates[year] and day in nested_dates[year][month]:
                nested_dates[year][month][day]["reservations"] += quantity
            current += timedelta(days=1)

    # Initialize equipment properties.
    item_properties = {}
    current_quantity = 0

    # Read the corresponding equipment file.
    if os.path.exists(equipment_file_path):
        with open(equipment_file_path, 'r', encoding='utf-8') as eq_file:
            equipment_data = json.load(eq_file)
        equipment_item = equipment_data.get('data', {})
        # Copy the desired properties.
        for key in equipment_keys:
            item_properties[key] = equipment_item.get(key, None)
        current_quantity = equipment_item.get('current_quantity', 0)
    else:
        print(f"Error: Equipment file '{filename}' not found in '{equipment_dir}'. Assuming current_quantity = 0 and default item properties.")
        for key in equipment_keys:
            item_properties[key] = None

    # Update each day with the available quantity and optionally suppress days with zero reservations.
    for year in list(nested_dates.keys()):
        for month in list(nested_dates[year].keys()):
            for day in list(nested_dates[year][month].keys()):
                day_data = nested_dates[year][month][day]
                reservations = day_data["reservations"]
                day_data["available"] = current_quantity - reservations
                if SUPPRESS_ZERO and reservations == 0:
                    del nested_dates[year][month][day]
            # If a month becomes empty after removing days, remove the month.
            if not nested_dates[year][month]:
                del nested_dates[year][month]
        # If a year becomes empty after removing months, remove the year.
        if not nested_dates[year]:
            del nested_dates[year]

    # If SUPPRESS_EMPTY_FILE is True and the nested dates dictionary is empty,
    # remove the output file if it exists and skip writing.
    if SUPPRESS_EMPTY_FILE and not nested_dates:
        if os.path.exists(output_path):
            os.remove(output_path)
            print(f"Removed existing file {output_path} because there are no reservations.")
        continue

    # Combine the item properties and the nested date reservation data.
    output_data = {
        "item": item_properties,
        "dates": nested_dates
    }

    # Write the result to the output directory.
    with open(output_path, 'w', encoding='utf-8') as outfile:
        json.dump(output_data, outfile, indent=2)

print("Reservation, availability, and item data have been processed and saved.")

Step 2

Download the set hiarchy from the API

(takes long, build in delays to prevent throttling)

import json
import time
import requests
import sys

# Configuration variables (adjust these as needed)
BASE_URL = "https://api.rentman.net/"  # Replace with your actual base URL

FILTER_ON_TAGS = False  # Set to True if you want to apply tag filtering
API_TOKEN = "abc"  # Replace with your actual API token

def log_message(message, force=False):
    """Logs a message. You can modify this to log to a file if needed."""
    sys.stderr.write(message + "\n")
    if force:
        print(message)

def count_api_usage():
    """Stub function for counting API usage."""
    # Extend this function to track API usage if needed.
    pass

def filter_array_by_selections(items, selections):
    """
    Filters an array of dictionaries based on the presence of certain keys with truthy values.

    :param items: List of dictionaries to filter.
    :param selections: List of keys to check.
    :return: Filtered list of dictionaries.
    """
    filtered = []
    for item in items:
        for sel in selections:
            if item.get(sel):
                filtered.append(item)
                break
    return filtered

def make_request(method, endpoint, data=None):
    """
    Makes an HTTP request using the requests library.

    :param method: HTTP method ('GET' or 'POST').
    :param endpoint: API endpoint (appended to BASE_URL).
    :param data: Dictionary of data or query parameters.
    :return: Parsed JSON response or None if error occurs.
    """
    if data is None:
        data = {}

    url = BASE_URL + endpoint
    headers = {
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json"
    }

    try:
        if method.upper() == 'POST':
            response = requests.post(url, headers=headers, json=data)
        elif method.upper() == 'GET':
            response = requests.get(url, headers=headers, params=data)
        else:
            log_message(f"Unsupported HTTP method: {method}")
            return None
    except requests.RequestException as e:
        log_message(f"Request exception: {e}")
        return None

    if not response.ok:
        log_message(f"HTTP error: {response.status_code} | Response Body: {response.text}")
    try:
        return response.json()
    except json.JSONDecodeError:
        log_message("Error decoding JSON response.")
        return None

def get_request(endpoint, params=None):
    """
    Performs a GET request and counts API usage.

    :param endpoint: API endpoint.
    :param params: Query parameters as a dictionary.
    :return: Parsed JSON response.
    """
    count_api_usage()
    return make_request('GET', endpoint, params)

def equipmentsetscontent(filter_on_tags=FILTER_ON_TAGS):
    """
    Retrieves equipment sets content by fetching data in batches.

    :param filter_on_tags: If True, applies tag filtering.
    :return: Complete list of items.
    """
    all_items = []
    offset = 0
    batch_size = 300
    total_fetched = 0

    while True:
        params = {'offset': offset, 'limit': batch_size}
        response = get_request('equipmentsetscontent', params)

        if not response or 'data' not in response:
            print("<pre><h1>Error in API Request</h1>")
            print(response)
            sys.exit(1)

        items = response['data']
        all_items.extend(items)
        total_fetched += len(items)

        # If fewer than batch_size items are returned, no more records are available.
        if len(items) < batch_size:
            break

        offset += batch_size

        # If 3000 records have been fetched, pause for 1 second to avoid rate limits.
        if total_fetched >= 3000:
            log_message("Pausing for 1 second to avoid rate limits...", True)
            time.sleep(1)
            total_fetched = 0  # Reset the counter after the pause

    if filter_on_tags:
        all_items = filter_array_by_selections(all_items, ['hoofditem', 'set'])

    return all_items

# Main execution
if __name__ == "__main__":
    complete_data = equipmentsetscontent()
    print(json.dumps(complete_data, indent=4))

Step 3,

Convert the output from step 2 into sets-childs format (parrent with list of childs).

import json
from collections import defaultdict

# Define file paths
input_file = "sets/sets.json"
output_file = "sets/sets-childs.json"

def strip_prefix(equipment_str):
    prefix = "/equipment/"
    return equipment_str[len(prefix):] if equipment_str.startswith(prefix) else equipment_str

# Load the input JSON file
with open(input_file, "r") as infile:
    data = json.load(infile)

# Group child items by parent equipment (after stripping the prefix)
sets_dict = defaultdict(list)
for entry in data:
    parent = strip_prefix(entry["parent_equipment"])
    child_item = {
        "equipment": strip_prefix(entry["equipment"]),
        "quantity": entry["quantity"]
    }
    sets_dict[parent].append(child_item)

# Sort the dictionary by parent keys in ascending numerical order
sorted_sets_dict = {k: sets_dict[k] for k in sorted(sets_dict.keys(), key=lambda x: int(x))}

# Write the output JSON file with sorted parent keys
with open(output_file, "w") as outfile:
    json.dump(sorted_sets_dict, outfile, indent=4)

print("The new JSON file has been created successfully.")

Step 4

Calculate set availability into reservationperdate-sets folder.

Prompt is attached: prompt to create set reservations.docx

#!/usr/bin/env python3
import os
import json
import math
from collections import defaultdict

# Directories for input and output files
SETS_FILE = os.path.join("sets", "sets-childs.json")
RESERVATION_DIR = "reservationperdate"
OUTPUT_DIR = "reservationperdate-sets"

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

def load_json(filepath):
    with open(filepath, "r") as f:
        return json.load(f)

def get_child_reservation(child_id):
    """
    Given a child equipment id, load its reservation file.
    Assumes filename is child_id + ".json" in the RESERVATION_DIR.
    Returns a dict with keys "item" and optionally "dates".
    """
    filepath = os.path.join(RESERVATION_DIR, f"{child_id}.json")
    if os.path.exists(filepath):
        return load_json(filepath)
    else:
        raise FileNotFoundError(f"Reservation file for equipment {child_id} not found at {filepath}")

def extract_date_availability(reservation):
    """
    Extract a mapping from (year, month, day) tuple to available quantity.
    If a reservation file has no "dates" node, returns an empty dict.
    """
    date_avail = {}
    dates = reservation.get("dates", {})
    for year, months in dates.items():
        for month, days in months.items():
            for day, data in days.items():
                date_avail[(year, month, day)] = data.get("available", 0)
    return date_avail

def nested_date_structure(date_dict):
    """
    Given a mapping with keys (year, month, day) and value (availability),
    build a nested dictionary: year -> month -> day -> {reservations: 0, available: value}
    """
    nested = defaultdict(lambda: defaultdict(dict))
    for (year, month, day), avail in date_dict.items():
        nested[year][month][day] = {"reservations": 0, "available": avail}
    return {year: dict(months) for year, months in nested.items()}

def main():
    # Load the sets definitions
    sets_def = load_json(SETS_FILE)
    
    # Process each set defined in sets-childs.json
    for set_id, children in sets_def.items():
        child_infos = []  # List of tuples: (child_id, required_quantity, full_stock, date_availability mapping)
        for child in children:
            child_id = child["equipment"]
            req_qty = child["quantity"]
            try:
                reservation = get_child_reservation(child_id)
            except FileNotFoundError as e:
                print(e)
                continue  # Skip this child if its reservation file is missing
            full_stock = reservation.get("item", {}).get("current_quantity", 0)
            date_avail = extract_date_availability(reservation)
            child_infos.append((child_id, req_qty, full_stock, date_avail))
        
        # Calculate the full availability (without date restrictions)
        available_sets_full = math.inf
        for (_, req_qty, full_stock, _) in child_infos:
            child_full_sets = full_stock // req_qty
            available_sets_full = min(available_sets_full, child_full_sets)
        if available_sets_full == math.inf:
            available_sets_full = 0

        # Gather the union of all dates from the children
        union_dates = set()
        for (_, _, full_stock, date_avail) in child_infos:
            union_dates.update(date_avail.keys())
        
        # Compute availability per date from the union of dates
        date_results = {}
        for date in union_dates:
            avail_for_date = math.inf
            for (_, req_qty, full_stock, date_avail) in child_infos:
                # Use date-specific available if provided, otherwise use full stock
                child_avail = date_avail.get(date, full_stock)
                child_sets = child_avail // req_qty
                avail_for_date = min(avail_for_date, child_sets)
            date_results[date] = avail_for_date
        
        # Only include dates where the available set count is lower than full availability
        filtered_dates = {date: avail for date, avail in date_results.items() if avail < available_sets_full}

        # Build output JSON structure following the reservation file structure
        output = {}
        output["item"] = {
            "is_combination": True,
            "current_quantity": available_sets_full
        }
        if filtered_dates:
            output["dates"] = nested_date_structure(filtered_dates)
        
        # Write the output JSON file for the set to the OUTPUT_DIR
        out_filepath = os.path.join(OUTPUT_DIR, f"{set_id}.json")
        with open(out_filepath, "w") as outfile:
            json.dump(output, outfile, indent=4)
        print(f"Set {set_id}: full availability {available_sets_full} written to {out_filepath}")

if __name__ == "__main__":
    main()