Scraping MorphoSource with Python and GitHub Actions
MorphoSource is a digital repository for 3D media of natural history specimens. Sometimes, you may want to automate the monitoring of new records on MorphoSource, especially if you’re working on scientific research or keeping track of the latest specimen data. In this blog post, we’ll walk through:
- Inspecting elements on a MorphoSource webpage to identify how to scrape data.
- Using GitHub Actions to run a Python script that fetches and processes that data.
1. Inspecting the MorphoSource Webpage
For this demonstration, let’s look at a specific MorphoSource media page:
https://www.morphosource.org/concern/media/000699076
Why “Inspect Element”?
- Identify HTML structure: By right-clicking (in most browsers) and selecting Inspect Element, we can see how the page is structured.
- Find relevant tags/attributes: You may discover unique IDs, classes, or meta tags that contain the data you want to scrape.
- Locate patterns: MorphoSource uses a consistent format for search results and detail pages. Observing these patterns can help you figure out the best approach for scraping.
When you open your browser’s Developer Tools (e.g., Chrome DevTools), you’ll see the DOM structure, CSS, and network activity. This helps determine if:
- The data is available in the page’s HTML.
- There are relevant
metaordivtags containing the values you need. - You need to parse dynamic JavaScript content (in which case you might consider a headless browser approach).
For the search results page at
https://www.morphosource.org/catalog/media?q=X-Ray+Computed+Tomography, you can see that each media record is displayed under <li class="document blacklight-media">. Inspecting those elements reveals the metadata fields for each record, such as Title, Taxonomy, Element or Part, etc.
2. How GitHub Actions Can Run Python Code
GitHub Actions allows you to automate workflows directly in your GitHub repository. These workflows can run Python code to scrape MorphoSource or perform any other data processing tasks. Here’s the general flow:
- Create a GitHub Actions workflow file in your repository’s
.github/workflowsdirectory (e.g.,scrape_morphosource.yml). - Specify your environment (e.g.,
runs-on: ubuntu-latest) and install dependencies likerequestsandbeautifulsoup4. - Run the Python script that scrapes or fetches data from MorphoSource.
By using GitHub Actions, you can schedule these scrapes to run periodically (e.g., every day) or trigger them on specific events (e.g., after pushing code).
3. The Python Scraping Code
Below is a Python script (morphosource_scraper.py) that performs the following:
- Fetches the total record count for X-ray Computed Tomography data on MorphoSource.
- Identifies new records since the last run.
- Retrieves the top new records and formats a release message.
You can store this script in your repository, such as in a scripts/ folder, then reference it in your GitHub Actions workflow.
#!/usr/bin/env python3
import os
import requests
from bs4 import BeautifulSoup
import time
import sys
SEARCH_URL = (
"https://www.morphosource.org/catalog/media?locale=en"
"&q=X-Ray+Computed+Tomography&search_field=all_fields"
"&sort=system_create_dtsi+desc"
)
BASE_URL = "https://www.morphosource.org"
LAST_COUNT_FILE = ".github/last_count.txt"
class MorphoSourceTemporarilyUnavailable(Exception):
"""Custom exception for when MorphoSource is temporarily unavailable"""
pass
def check_for_server_error(response_text):
"""Check if the response indicates a server error"""
if "MorphoSource temporarily unavailable (500)" in response_text:
raise MorphoSourceTemporarilyUnavailable("MorphoSource is temporarily unavailable (500 error)")
def get_current_record_count(max_retries=3):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
for attempt in range(max_retries):
try:
response = requests.get(SEARCH_URL, headers=headers, timeout=30)
response.raise_for_status()
# Debug output
print(f"Response status code: {response.status_code}", file=sys.stderr)
print("First 500 characters of response:", response.text[:500], file=sys.stderr)
# Check for server error before proceeding
check_for_server_error(response.text)
soup = BeautifulSoup(response.text, 'html.parser')
# Method 1: Meta tag
meta_tag = soup.find('meta', {'name': 'totalResults'})
if meta_tag and meta_tag.get('content'):
return int(meta_tag['content'])
# Method 2: Search results count text
results_text = soup.select_one('div.page-links')
if results_text:
text = results_text.get_text()
import re
if match := re.search(r'(\d+)\s+results?', text):
return int(match.group(1))
# Method 3: Count actual results
results = soup.select('div#search-results li.document.blacklight-media')
if results:
return len(results)
if attempt < max_retries - 1:
time.sleep(5) # Wait before retry
continue
raise ValueError("Could not find result count using any method")
except MorphoSourceTemporarilyUnavailable as e:
print(f"Server Error: {str(e)}", file=sys.stderr)
if attempt < max_retries - 1:
print(f"Retrying in 5 seconds (attempt {attempt + 1}/{max_retries})", file=sys.stderr)
time.sleep(5)
continue
raise
except requests.RequestException as e:
print(f"Request failed (attempt {attempt + 1}/{max_retries}): {e}", file=sys.stderr)
if attempt < max_retries - 1:
time.sleep(5)
continue
raise
raise ValueError("Failed to get record count after all retries")
def load_last_count():
if not os.path.exists(LAST_COUNT_FILE):
return 0
try:
with open(LAST_COUNT_FILE, "r") as f:
return int(f.read().strip())
except ValueError:
return 0
def save_last_count(count):
os.makedirs(os.path.dirname(LAST_COUNT_FILE), exist_ok=True)
with open(LAST_COUNT_FILE, "w") as f:
f.write(str(count))
def parse_top_records(n=3):
"""
Grabs the first n <li class="document blacklight-media"> from the search results
(descending by creation date). Returns a list of dicts containing relevant metadata.
"""
session = requests.Session()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
resp = session.get(SEARCH_URL, headers=headers, timeout=30)
resp.raise_for_status()
# Check for server error
check_for_server_error(resp.text)
soup = BeautifulSoup(resp.text, "html.parser")
li_list = soup.select("div#search-results li.document.blacklight-media")[:n]
records = []
for li in li_list:
record = {}
# 1) Title & detail link
title_el = li.select_one("h3.search-result-title a")
if title_el:
record["title"] = title_el.get_text(strip=True)
record["detail_url"] = BASE_URL + title_el.get("href", "")
else:
record["title"] = "No Title"
record["detail_url"] = None
# 2) Additional metadata from dt/dd pairs
metadata_dl = li.select_one("div.metadata dl.dl-horizontal")
if metadata_dl:
items = metadata_dl.select("div.index-field-item")
for item in items:
dt = item.select_one("dt")
dd = item.select_one("dd")
if dt and dd:
field_name = dt.get_text(strip=True).rstrip(":")
field_value = dd.get_text(strip=True)
record[field_name] = field_value
records.append(record)
return records
def format_release_message(new_records, old_count, records):
"""
Creates a multiline string for the Release body:
- How many new records (plus old record value)
- Then each record in descending order, labeled as "New Record #..."
"""
lines = []
lines.append("A new increase in X-ray Computed Tomography records was found on MorphoSource.")
lines.append("")
lines.append(f"We found {new_records} new record(s) (old record value: {old_count}).")
lines.append("")
for i, rec in enumerate(records, start=1):
record_number = old_count + new_records - (i - 1)
lines.append(f"New Record #{record_number} Title: {rec.get('title', 'N/A')}")
lines.append(f"Detail Page URL: {rec.get('detail_url', 'N/A')}")
for key in [
"Object",
"Taxonomy",
"Element or Part",
"Data Manager",
"Date Uploaded",
"Publication Status",
"Rights Statement",
"CC License",
]:
if key in rec:
lines.append(f"{key}: {rec[key]}")
lines.append("") # Blank line after each record
return "\n".join(lines)
def write_github_output(is_new_data, message):
"""Helper function to write GitHub output"""
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a") as fh:
fh.write(f"new_data={str(is_new_data).lower()}\n")
fh.write("details<<EOF\n")
fh.write(message + "\n")
fh.write("EOF\n")
def main():
try:
current_count = get_current_record_count()
print(f"Current count: {current_count}", file=sys.stderr)
old_count = load_last_count()
print(f"Old count: {old_count}", file=sys.stderr)
new_records = current_count - old_count
print(f"New records: {new_records}", file=sys.stderr)
if new_records > 0:
records_to_fetch = min(new_records, 3)
top_records = parse_top_records(n=records_to_fetch)
save_last_count(current_count)
message = format_release_message(new_records, old_count, top_records)
write_github_output(True, message)
else:
write_github_output(False, "No new records found.")
except MorphoSourceTemporarilyUnavailable as e:
print(f"Server Error: {str(e)}", file=sys.stderr)
write_github_output(False, f"Error: MorphoSource is temporarily unavailable. Please try again later.")
sys.exit(0) # Exit gracefully
except Exception as e:
print(f"Error in main: {str(e)}", file=sys.stderr)
write_github_output(False, f"Error: {str(e)}")
raise
if __name__ == "__main__":
main()
Key Points in the Script
- SEARCH_URL and BASE_URL: The starting points for retrieving and constructing links to the desired search results and detail pages.
- get_current_record_count: Fetches the total number of records found in the search. This includes multiple methods of extraction (meta tags, text matching, etc.).
- parse_top_records: Extracts metadata for the top new records (e.g., title, taxonomy, date uploaded, etc.).
- LAST_COUNT_FILE: Stores the previous count of records in
.github/last_count.txt. This is how the script detects new records since the last run. - write_github_output: Writes information back to GitHub Actions so you can use it in subsequent steps.
Happy scraping!
← Previous Post $~~~~~~~~~~~$ Next Post →