Sets and availabilty
Step 1
Reformat plannedequipment into reservations per date
Processes equipment reservation data.
- Reads reservation records from JSON files in the "plannedequipment" directory.
- Reads equipment properties from JSON files in the "equipment" directory.
Calculates daily reservation totals for each equipment item.
- Covers a date range: current month plus a configurable number of months (default 9).
- Organizes data into a nested structure: year → month → day.
Computes daily available quantity: current quantity minus reservations.
Outputs results to JSON files in the "reservationperdate" directory.
Offers optional features:
- Suppression of days with zero reservations.
- Prevention of file creation when no reservation data exists.
Script
import os
import json
from datetime import datetime, date, timedelta
import calendar
# Parameters
SUPPRESS_ZERO = True # If True, skip storing day nodes with zero reservations.
SUPPRESS_EMPTY_FILE = True # If True, do not create an output file if the "dates" part is empty.
ADDITIONAL_MONTHS = 9 # Number of additional months to process (beyond the current month)
# Define the directories
plannedequipment_dir = 'plannedequipment'
equipment_dir = 'equipment'
output_dir = 'reservationperdate'
os.makedirs(output_dir, exist_ok=True)
# Define the date range: from the first day of this month to the last day of the month ADDITIONAL_MONTHS later.
today = date.today()
start_date = date(today.year, today.month, 1)
end_month = today.month + ADDITIONAL_MONTHS
end_year = today.year + (end_month - 1) // 12
end_month = ((end_month - 1) % 12) + 1
last_day = calendar.monthrange(end_year, end_month)[1]
end_date = date(end_year, end_month, last_day)
# Define the list of equipment property keys to copy from the equipment JSON files.
equipment_keys = ["is_combination", "is_physical", "current_quantity"]
def add_day(nested_dates, current_date):
"""Ensure that nested_dates[year][month][day] exists and initialize it."""
year = str(current_date.year)
month = f"{current_date.month:02d}"
day = f"{current_date.day:02d}"
if year not in nested_dates:
nested_dates[year] = {}
if month not in nested_dates[year]:
nested_dates[year][month] = {}
if day not in nested_dates[year][month]:
nested_dates[year][month][day] = {"reservations": 0, "available": None}
def remove_empty_containers(nested_dates):
"""Remove empty month and year nodes if they have no day entries."""
years_to_remove = []
for year, months in nested_dates.items():
months_to_remove = []
for month, days in months.items():
if not days: # If month dictionary is empty
months_to_remove.append(month)
for month in months_to_remove:
del nested_dates[year][month]
if not nested_dates[year]:
years_to_remove.append(year)
for year in years_to_remove:
del nested_dates[year]
# Process each JSON file in the plannedequipment directory.
for filename in os.listdir(plannedequipment_dir):
if not filename.endswith('.json'):
continue
# Build full paths for plannedequipment, equipment and output files.
plan_file_path = os.path.join(plannedequipment_dir, filename)
equipment_file_path = os.path.join(equipment_dir, filename)
output_path = os.path.join(output_dir, filename)
# Load the reservation data from the plannedequipment file.
with open(plan_file_path, 'r', encoding='utf-8') as infile:
plan_data = json.load(infile)
# Initialize the nested dates dictionary.
nested_dates = {}
current_date = start_date
while current_date <= end_date:
add_day(nested_dates, current_date)
current_date += timedelta(days=1)
# Process each reservation record.
for record in plan_data.get('data', []):
start_str = record.get('planperiod_start')
end_str = record.get('planperiod_end')
if not start_str or not end_str:
continue
record_start = datetime.fromisoformat(start_str).date()
record_end = datetime.fromisoformat(end_str).date()
# Clamp the reservation period to our target range.
period_start = max(record_start, start_date)
period_end = min(record_end, end_date)
if period_start > period_end:
continue
quantity = record.get('quantity', 0)
current = period_start
while current <= period_end:
year = str(current.year)
month = f"{current.month:02d}"
day = f"{current.day:02d}"
if year in nested_dates and month in nested_dates[year] and day in nested_dates[year][month]:
nested_dates[year][month][day]["reservations"] += quantity
current += timedelta(days=1)
# Initialize equipment properties.
item_properties = {}
current_quantity = 0
# Read the corresponding equipment file.
if os.path.exists(equipment_file_path):
with open(equipment_file_path, 'r', encoding='utf-8') as eq_file:
equipment_data = json.load(eq_file)
equipment_item = equipment_data.get('data', {})
# Copy the desired properties.
for key in equipment_keys:
item_properties[key] = equipment_item.get(key, None)
current_quantity = equipment_item.get('current_quantity', 0)
else:
print(f"Error: Equipment file '{filename}' not found in '{equipment_dir}'. Assuming current_quantity = 0 and default item properties.")
for key in equipment_keys:
item_properties[key] = None
# Update each day with the available quantity and optionally suppress days with zero reservations.
for year in list(nested_dates.keys()):
for month in list(nested_dates[year].keys()):
for day in list(nested_dates[year][month].keys()):
day_data = nested_dates[year][month][day]
reservations = day_data["reservations"]
day_data["available"] = current_quantity - reservations
if SUPPRESS_ZERO and reservations == 0:
del nested_dates[year][month][day]
# If a month becomes empty after removing days, remove the month.
if not nested_dates[year][month]:
del nested_dates[year][month]
# If a year becomes empty after removing months, remove the year.
if not nested_dates[year]:
del nested_dates[year]
# If SUPPRESS_EMPTY_FILE is True and the nested dates dictionary is empty,
# remove the output file if it exists and skip writing.
if SUPPRESS_EMPTY_FILE and not nested_dates:
if os.path.exists(output_path):
os.remove(output_path)
print(f"Removed existing file {output_path} because there are no reservations.")
continue
# Combine the item properties and the nested date reservation data.
output_data = {
"item": item_properties,
"dates": nested_dates
}
# Write the result to the output directory.
with open(output_path, 'w', encoding='utf-8') as outfile:
json.dump(output_data, outfile, indent=2)
print("Reservation, availability, and item data have been processed and saved.")
Step 2
Download the set hiarchy from the API
(takes long, build in delays to prevent throttling)
import json
import time
import requests
import sys
# Configuration variables (adjust these as needed)
BASE_URL = "https://api.rentman.net/" # Replace with your actual base URL
FILTER_ON_TAGS = False # Set to True if you want to apply tag filtering
API_TOKEN = "abc" # Replace with your actual API token
def log_message(message, force=False):
"""Logs a message. You can modify this to log to a file if needed."""
sys.stderr.write(message + "\n")
if force:
print(message)
def count_api_usage():
"""Stub function for counting API usage."""
# Extend this function to track API usage if needed.
pass
def filter_array_by_selections(items, selections):
"""
Filters an array of dictionaries based on the presence of certain keys with truthy values.
:param items: List of dictionaries to filter.
:param selections: List of keys to check.
:return: Filtered list of dictionaries.
"""
filtered = []
for item in items:
for sel in selections:
if item.get(sel):
filtered.append(item)
break
return filtered
def make_request(method, endpoint, data=None):
"""
Makes an HTTP request using the requests library.
:param method: HTTP method ('GET' or 'POST').
:param endpoint: API endpoint (appended to BASE_URL).
:param data: Dictionary of data or query parameters.
:return: Parsed JSON response or None if error occurs.
"""
if data is None:
data = {}
url = BASE_URL + endpoint
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
try:
if method.upper() == 'POST':
response = requests.post(url, headers=headers, json=data)
elif method.upper() == 'GET':
response = requests.get(url, headers=headers, params=data)
else:
log_message(f"Unsupported HTTP method: {method}")
return None
except requests.RequestException as e:
log_message(f"Request exception: {e}")
return None
if not response.ok:
log_message(f"HTTP error: {response.status_code} | Response Body: {response.text}")
try:
return response.json()
except json.JSONDecodeError:
log_message("Error decoding JSON response.")
return None
def get_request(endpoint, params=None):
"""
Performs a GET request and counts API usage.
:param endpoint: API endpoint.
:param params: Query parameters as a dictionary.
:return: Parsed JSON response.
"""
count_api_usage()
return make_request('GET', endpoint, params)
def equipmentsetscontent(filter_on_tags=FILTER_ON_TAGS):
"""
Retrieves equipment sets content by fetching data in batches.
:param filter_on_tags: If True, applies tag filtering.
:return: Complete list of items.
"""
all_items = []
offset = 0
batch_size = 300
total_fetched = 0
while True:
params = {'offset': offset, 'limit': batch_size}
response = get_request('equipmentsetscontent', params)
if not response or 'data' not in response:
print("<pre><h1>Error in API Request</h1>")
print(response)
sys.exit(1)
items = response['data']
all_items.extend(items)
total_fetched += len(items)
# If fewer than batch_size items are returned, no more records are available.
if len(items) < batch_size:
break
offset += batch_size
# If 3000 records have been fetched, pause for 1 second to avoid rate limits.
if total_fetched >= 3000:
log_message("Pausing for 1 second to avoid rate limits...", True)
time.sleep(1)
total_fetched = 0 # Reset the counter after the pause
if filter_on_tags:
all_items = filter_array_by_selections(all_items, ['hoofditem', 'set'])
return all_items
# Main execution
if __name__ == "__main__":
complete_data = equipmentsetscontent()
print(json.dumps(complete_data, indent=4))
Step 3,
Convert the output from step 2 into sets-childs format (parrent with list of childs).
import json
from collections import defaultdict
# Define file paths
input_file = "sets/sets.json"
output_file = "sets/sets-childs.json"
def strip_prefix(equipment_str):
prefix = "/equipment/"
return equipment_str[len(prefix):] if equipment_str.startswith(prefix) else equipment_str
# Load the input JSON file
with open(input_file, "r") as infile:
data = json.load(infile)
# Group child items by parent equipment (after stripping the prefix)
sets_dict = defaultdict(list)
for entry in data:
parent = strip_prefix(entry["parent_equipment"])
child_item = {
"equipment": strip_prefix(entry["equipment"]),
"quantity": entry["quantity"]
}
sets_dict[parent].append(child_item)
# Sort the dictionary by parent keys in ascending numerical order
sorted_sets_dict = {k: sets_dict[k] for k in sorted(sets_dict.keys(), key=lambda x: int(x))}
# Write the output JSON file with sorted parent keys
with open(output_file, "w") as outfile:
json.dump(sorted_sets_dict, outfile, indent=4)
print("The new JSON file has been created successfully.")
Step 4
Calculate set availability into reservationperdate-sets folder.
Prompt is attached:Prompt: prompt to create set reservations.docx
#!/usr/bin/env python3
import os
import json
import math
from collections import defaultdict
# Directories for input and output files
SETS_FILE = os.path.join("sets", "sets-childs.json")
RESERVATION_DIR = "reservationperdate"
OUTPUT_DIR = "reservationperdate-sets"
# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)
def load_json(filepath):
with open(filepath, "r") as f:
return json.load(f)
def get_child_reservation(child_id):
"""
Given a child equipment id, load its reservation file.
Assumes filename is child_id + ".json" in the RESERVATION_DIR.
Returns a dict with keys "item" and optionally "dates".
"""
filepath = os.path.join(RESERVATION_DIR, f"{child_id}.json")
if os.path.exists(filepath):
return load_json(filepath)
else:
raise FileNotFoundError(f"Reservation file for equipment {child_id} not found at {filepath}")
def extract_date_availability(reservation):
"""
Extract a mapping from (year, month, day) tuple to available quantity.
If a reservation file has no "dates" node, returns an empty dict.
"""
date_avail = {}
dates = reservation.get("dates", {})
for year, months in dates.items():
for month, days in months.items():
for day, data in days.items():
date_avail[(year, month, day)] = data.get("available", 0)
return date_avail
def nested_date_structure(date_dict):
"""
Given a mapping with keys (year, month, day) and value (availability),
build a nested dictionary: year -> month -> day -> {reservations: 0, available: value}
"""
nested = defaultdict(lambda: defaultdict(dict))
for (year, month, day), avail in date_dict.items():
nested[year][month][day] = {"reservations": 0, "available": avail}
return {year: dict(months) for year, months in nested.items()}
def main():
# Load the sets definitions
sets_def = load_json(SETS_FILE)
# Process each set defined in sets-childs.json
for set_id, children in sets_def.items():
child_infos = [] # List of tuples: (child_id, required_quantity, full_stock, date_availability mapping)
for child in children:
child_id = child["equipment"]
req_qty = child["quantity"]
try:
reservation = get_child_reservation(child_id)
except FileNotFoundError as e:
print(e)
continue # Skip this child if its reservation file is missing
full_stock = reservation.get("item", {}).get("current_quantity", 0)
date_avail = extract_date_availability(reservation)
child_infos.append((child_id, req_qty, full_stock, date_avail))
# Calculate the full availability (without date restrictions)
available_sets_full = math.inf
for (_, req_qty, full_stock, _) in child_infos:
child_full_sets = full_stock // req_qty
available_sets_full = min(available_sets_full, child_full_sets)
if available_sets_full == math.inf:
available_sets_full = 0
# Gather the union of all dates from the children
union_dates = set()
for (_, _, full_stock, date_avail) in child_infos:
union_dates.update(date_avail.keys())
# Compute availability per date from the union of dates
date_results = {}
for date in union_dates:
avail_for_date = math.inf
for (_, req_qty, full_stock, date_avail) in child_infos:
# Use date-specific available if provided, otherwise use full stock
child_avail = date_avail.get(date, full_stock)
child_sets = child_avail // req_qty
avail_for_date = min(avail_for_date, child_sets)
date_results[date] = avail_for_date
# Only include dates where the available set count is lower than full availability
filtered_dates = {date: avail for date, avail in date_results.items() if avail < available_sets_full}
# Build output JSON structure following the reservation file structure
output = {}
output["item"] = {
"is_combination": True,
"current_quantity": available_sets_full
}
if filtered_dates:
output["dates"] = nested_date_structure(filtered_dates)
# Write the output JSON file for the set to the OUTPUT_DIR
out_filepath = os.path.join(OUTPUT_DIR, f"{set_id}.json")
with open(out_filepath, "w") as outfile:
json.dump(output, outfile, indent=4)
print(f"Set {set_id}: full availability {available_sets_full} written to {out_filepath}")
if __name__ == "__main__":
main()