Skip to content

Process Log Files

First PublishedLast UpdatedByAtif Alam

This page walks through how to process log files with Python: read input, extract fields, aggregate, and report. The script uses the standard library only — no third-party packages and no regular expressions. You add one piece at a time and run python process_logs.py after each step so nothing piles up in your head.

Create a file process_logs.py and an app.log in the same folder (use the sample below). Python 3.9+ is enough for the typing used here. For more on looping over files, see Loops.

You have plain-text application logs. Each line looks like: date, time, severity, then the rest of the message. You want to:

  • Count how many lines you see per severity (for example INFO, WARN, ERROR).
  • For ERROR lines, count how many fall into each hour so you can spot spikes.
  • Print a short summary to the terminal.

Real logs are often messy: skip lines that do not match the shape you expect instead of crashing.

Create app.log in the same folder as your script with content like this:

2024-03-19 10:15:01 INFO service=auth msg="Startup complete"
2024-03-19 10:45:22 ERROR service=auth msg="Login failed" user=guest
2024-03-19 10:47:01 WARN service=api msg="High latency"
2024-03-19 11:02:15 ERROR service=db msg="Connection timeout"
2024-03-19 11:15:00 INFO service=auth msg="User logged in" user=atif

Each line: YYYY-MM-DD HH:MM:SS LEVEL message... (fields separated by spaces; the message may contain spaces).

What you’ll build: A generator that reads lines without loading the whole file, a split-based parser, an analyze function that counts and buckets, and a small report printer. You’ll replace the if __name__ == "__main__": block in each step (or grow it as shown).


Goal: Open app.log and stream lines one at a time. Prove it with a line count and a short preview.

pathlib.Path keeps paths readable. Iterating for line in f avoids reading the entire file into memory at once (important for large logs).

The preview truncates long lines with first[:80] — string slicing, same idea as list slices in Language basics.

from pathlib import Path
LOG_FILE = Path("app.log")
def read_lines(path: Path):
"""Yield stripped lines one at a time (memory-friendly for big files)."""
# utf-8 is typical; errors="replace" avoids crashing on odd bytes
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
yield line.strip()
if __name__ == "__main__":
if not LOG_FILE.is_file():
print(f"Missing {LOG_FILE}. Create it using the sample log above.")
raise SystemExit(1)
# One pass: count every line; keep only
# the first line for preview (no list of all lines).
n = 0
first = None
for line in read_lines(LOG_FILE):
if first is None:
first = line
n += 1
print("Step 1 OK:", n, "lines")
# If the file had at least one line,
# show a short preview (truncate long lines).
if first:
preview = first if len(first) <= 80 else first[:80] + "..."
print("First line preview:", preview)

Check: Run python process_logs.py. You should see Step 1 OK: 5 lines (or your line count) and a preview of the first line.


Goal: Turn a single log line into a small dict (timestamp, severity, message), or None if the line does not fit.

line.split(maxsplit=3) splits into at most four parts: date, time, severity, and everything else as the message (so spaces inside the message stay intact).

Add parse_line below read_lines (still above if __name__). Replace your __main__ block with the one below. Highlighted lines are what you add or change in this step (the rest matches Step 1).

from pathlib import Path
from typing import Optional
LOG_FILE = Path("app.log")
def read_lines(path: Path):
"""Yield stripped lines one at a time (memory-friendly for big files)."""
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
yield line.strip()
def parse_line(line: str) -> Optional[dict[str, str]]:
"""
Expect: date time LEVEL rest...
maxsplit=3 → four parts; the last part is the full message tail.
"""
if not line:
return None
# Split on whitespace into: date, time, severity, and the remaining message.
parts = line.split(maxsplit=3)
if len(parts) < 4: # malformed / too short — skip later instead of crashing
return None
date_s, time_s, severity, message = parts
return {
"timestamp": f"{date_s} {time_s}",
"severity": severity,
"message": message,
}
# One known-good line so you can test the parser without a file
SAMPLE_LINE = '2024-03-19 10:15:01 INFO service=auth msg="Startup complete"'
if __name__ == "__main__":
print("Step 2 sample parse:", parse_line(SAMPLE_LINE))
if not LOG_FILE.is_file():
print(f"Missing {LOG_FILE}. Create it using the sample log above.")
raise SystemExit(1)
for line in read_lines(LOG_FILE):
row = parse_line(line)
print(line[:60], "->", row)
break # only show first parsed row from file

Check: Run the script. The sample should print a dict with timestamp, severity, and message. The first file line should show a similar shape.

Match your parser to a stable log layout; if the layout changes, update the parser (or move to structured logs — see below).


Step 3 — Count Severities and Bucket Errors by Hour

Section titled “Step 3 — Count Severities and Bucket Errors by Hour”

Goal: Walk the whole file, count each severity, and for ERROR rows count how many fall in each clock hour.

Add imports: Counter, defaultdict, and datetime. Add analyze after parse_line. Replace __main__ with the version below (you can remove SAMPLE_LINE and the single-line loop from Step 2, or leave SAMPLE_LINE for quick experiments).

Counter tallies severities. defaultdict(int) lets you do errors_by_hour[hour_bucket] += 1 without checking if the key exists. strptime / strftime turn the timestamp string into an hour bucket.

from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
from typing import Optional
LOG_FILE = Path("app.log")
def read_lines(path: Path):
"""Yield stripped lines one at a time (memory-friendly for big files)."""
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
yield line.strip()
def parse_line(line: str) -> Optional[dict[str, str]]:
if not line:
return None
parts = line.split(maxsplit=3)
if len(parts) < 4:
return None
date_s, time_s, severity, message = parts
return {
"timestamp": f"{date_s} {time_s}",
"severity": severity,
"message": message,
}
def analyze(path: Path):
severity_counts: Counter[str] = Counter()
errors_by_hour: dict[str, int] = defaultdict(int)
errors: list[dict] = []
for line in read_lines(path):
row = parse_line(line)
if row is None:
continue # skip garbage lines
severity_counts[row["severity"]] += 1
if row["severity"] == "ERROR":
# Parse timestamp once, then bucket to the top of the hour
ts = datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S")
hour_bucket = ts.strftime("%Y-%m-%d %H:00")
errors_by_hour[hour_bucket] += 1
errors.append(row)
return severity_counts, errors_by_hour, errors
if __name__ == "__main__":
if not LOG_FILE.is_file():
print(f"Missing {LOG_FILE}. Create it using the sample log above.")
raise SystemExit(1)
counts, by_hour, err_rows = analyze(LOG_FILE)
print("Step 3 OK — severity counts:", dict(counts))
print("Errors by hour:", dict(by_hour))
print("Total ERROR rows stored:", len(err_rows))

Check: With the sample app.log, you should see two ERROR lines split across hours 2024-03-19 10:00 and 2024-03-19 11:00, and counts for INFO, WARN, and ERROR.


Goal: Format the aggregates for humans: severity totals, a simple per-hour bar for errors, and the last few error messages.

Add print_report after analyze. Replace __main__ to call print_report instead of printing raw dicts. The # bar uses min(n, 40) so one huge count does not flood the terminal.

from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
from typing import Optional
LOG_FILE = Path("app.log")
def read_lines(path: Path):
"""Yield stripped lines one at a time (memory-friendly for big files)."""
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
yield line.strip()
def parse_line(line: str) -> Optional[dict[str, str]]:
if not line:
return None
parts = line.split(maxsplit=3)
if len(parts) < 4:
return None
date_s, time_s, severity, message = parts
return {
"timestamp": f"{date_s} {time_s}",
"severity": severity,
"message": message,
}
def analyze(path: Path):
severity_counts: Counter[str] = Counter()
errors_by_hour: dict[str, int] = defaultdict(int)
errors: list[dict] = []
for line in read_lines(path):
row = parse_line(line)
if row is None:
continue # skip malformed lines
severity_counts[row["severity"]] += 1
if row["severity"] == "ERROR":
ts = datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S")
hour_bucket = ts.strftime("%Y-%m-%d %H:00")
errors_by_hour[hour_bucket] += 1
errors.append(row)
return severity_counts, errors_by_hour, errors
def print_report(severity_counts, errors_by_hour, errors) -> None:
print("=== Counts by Severity ===")
for severity, n in severity_counts.most_common():
print(f" {severity}: {n}")
print("\n=== Errors by Hour ===")
for hour in sorted(errors_by_hour):
n = errors_by_hour[hour]
bar = "#" * min(n, 40) # cap width for huge counts
print(f" {hour} {bar} ({n})")
print("\n=== Last Few Errors ===")
for row in errors[-5:]:
print(f" [{row['timestamp']}] {row['message']}")
if __name__ == "__main__":
if not LOG_FILE.is_file():
print(f"Missing {LOG_FILE}. Create it using the sample log above.")
raise SystemExit(1)
counts, by_hour, err_rows = analyze(LOG_FILE)
print_report(counts, by_hour, err_rows)

Check: Run the script. You should see three sections: counts by severity, error bars by hour, and the last few error lines.


These patterns also avoid regex. Use whichever matches how the logs are actually written.

SituationApproach
One JSON object per linejson.loads(line) inside try / except json.JSONDecodeError; then read fields from the dict.
Mostly key=value tokensLoop tokens with str.partition("=") and build a dict (strip quotes from values if needed).
Comma- or tab-separated exportsThe csv module or a careful split if quoting is simple.

Some logs are free-form or inconsistent; teams often prefer structured logging (JSON) so analysis stays simple. You do not need regex for the common cases above.

  • Memory: Iterate over the file object instead of read() or readlines() for large inputs.
  • Robustness: Malformed lines → skip or log a count; do not assume every line is perfect.
  • Aggregation: Counter and defaultdict are compact ways to tally and bucket in the standard library.
  • Next steps: You can extend the same idea with Path.glob for multiple files, writing CSV or JSON output, or alerting when counts cross a threshold in a time window.