270 lines
9.3 KiB
Python
270 lines
9.3 KiB
Python
import sys
|
|
import os
|
|
|
|
# Add the parent directory of herotools to the Python module search path
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
import time
|
|
import json
|
|
import subprocess
|
|
from typing import Optional,List
|
|
import redis
|
|
from herotools.logger import logger
|
|
from herotools.texttools import name_fix
|
|
from enum import Enum, auto
|
|
from dataclasses import dataclass
|
|
import git
|
|
|
|
|
|
# Initialize Redis client
|
|
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
|
|
|
|
# Define the ChangeType Enum
|
|
class ChangeType(Enum):
|
|
DEL = 'del'
|
|
MOD = 'mod'
|
|
NEW = 'new'
|
|
|
|
@dataclass
|
|
class FileChange:
|
|
commit_hash: str
|
|
commit_time: str
|
|
path: str #relative path in the repo
|
|
change_type: ChangeType
|
|
|
|
|
|
class Repo:
|
|
def __init__(self, cat: str, account: str, name: str, path: str):
|
|
self.cat = cat
|
|
self.account = account
|
|
self.name = name
|
|
self.path = path
|
|
self.hash_last_found: Optional[float] = None
|
|
self.hash_last_processed: Optional[str] = None
|
|
self.lastcheck: Optional[float] = None
|
|
|
|
def __str__(self):
|
|
return json.dumps({
|
|
"cat": self.cat,
|
|
"account": self.account,
|
|
"name": self.name,
|
|
"path": self.path,
|
|
"hash_last_found": self.hash_last_found,
|
|
"hash_last_processed": self.hash_last_processed,
|
|
"lastcheck": self.lastcheck
|
|
}, indent=2)
|
|
|
|
def _redis_key(self) -> str:
|
|
return f"gitcheck:{self.cat}:{self.account}:{self.name}"
|
|
|
|
def save_to_redis(self):
|
|
redis_client.set(self._redis_key(), json.dumps(self.__dict__))
|
|
|
|
@staticmethod
|
|
def load_from_redis(cat: str, account: str, name: str) -> Optional['Repo']:
|
|
redis_key = f"gitcheck:{cat}:{account}:{name}"
|
|
data = redis_client.get(redis_key)
|
|
if data:
|
|
data = json.loads(data)
|
|
repo = Repo(data["cat"], data["account"], data["name"], data["path"])
|
|
repo.hash_last_found = data.get("hash_last_found")
|
|
repo.hash_last_processed = data.get("hash_last_processed")
|
|
repo.lastcheck = data.get("lastcheck")
|
|
return repo
|
|
return None
|
|
|
|
def get_remote_commit_hash(self, branch: str) -> str:
|
|
"""Get the latest commit hash from the remote repository."""
|
|
result = subprocess.run(
|
|
['git', 'ls-remote', 'origin', f'refs/heads/{branch}'],
|
|
cwd=self.path,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
if result.returncode != 0:
|
|
raise Exception(f"Error fetching remote commit hash: {result.stderr}")
|
|
|
|
return result.stdout.split()[0]
|
|
|
|
def get_local_commit_hash(self) -> str:
|
|
"""Get the latest commit hash from the local repository."""
|
|
result = subprocess.run(
|
|
['git', 'rev-parse', 'HEAD'],
|
|
cwd=self.path,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
if result.returncode != 0:
|
|
raise Exception(f"Error fetching local commit hash: {result.stderr}")
|
|
|
|
return result.stdout.strip()
|
|
|
|
def get_current_branch(self) -> str:
|
|
result = subprocess.run(
|
|
['git', 'branch', '--show-current'],
|
|
cwd=self.path,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
if result.returncode != 0:
|
|
raise Exception(f"Error fetching local branch name: {result.stderr}")
|
|
|
|
return result.stdout.split()[0]
|
|
|
|
def get_remote_default_branch(self) -> str:
|
|
result = subprocess.run(
|
|
['git', 'ls-remote', '--symref', 'origin', 'HEAD'],
|
|
cwd=self.path,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
if result.returncode != 0:
|
|
raise Exception(f"Error fetching local branch name: {result.stderr}")
|
|
|
|
return result.stdout.split()[1].split('/')[-1]
|
|
|
|
def should_check_again(self) -> bool:
|
|
"""Determine if we should check the repository again based on the last check time."""
|
|
if self.lastcheck is None:
|
|
return True
|
|
return (time.time() - self.lastcheck) > 60
|
|
|
|
def update_last_check_time(self) -> None:
|
|
"""Update the last check time."""
|
|
self.lastcheck = time.time()
|
|
self.save_to_redis()
|
|
|
|
def log_change(self, epoch_time: float) -> None:
|
|
"""Log a detected change in Redis."""
|
|
self.hash_last_found = epoch_time
|
|
self.save_to_redis()
|
|
|
|
def check_for_changes(self, branch: str = 'main') -> None:
|
|
"""Check the repository for updates and log changes if found."""
|
|
if not self.should_check_again():
|
|
print("WAIT TO CHECK FOR CHANGES")
|
|
return
|
|
|
|
try:
|
|
diff_commits = self.get_local_remote_diff_commits(branch)
|
|
|
|
if diff_commits != []:
|
|
print("FOUND SOME CHANGES")
|
|
self.log_change(time.time())
|
|
file_changes = self.get_file_changes_from_commits(diff_commits)
|
|
self.print_file_changes(file_changes)
|
|
else:
|
|
print("NO CHANGED FOUND")
|
|
|
|
self.update_last_check_time()
|
|
except Exception as e:
|
|
print(f"An error occurred while checking repo {self.path}: {e}")
|
|
|
|
def get_local_remote_diff_commits(self, branch: str) -> List[git.Commit]:
|
|
# Open the repository
|
|
repo = git.Repo(self.path)
|
|
|
|
# Get the local branch
|
|
local_branch = repo.heads[branch]
|
|
# Get the remote reference for the branch
|
|
remote_ref = repo.remotes.origin.refs[branch]
|
|
|
|
# Fetch the latest changes from the remote
|
|
repo.remotes.origin.fetch()
|
|
|
|
# Get the commit hashes of the local and remote branches
|
|
local_commit = local_branch.commit
|
|
remote_commit = remote_ref.commit
|
|
|
|
if local_commit == remote_commit:
|
|
return []
|
|
|
|
# Get the common ancestor commit
|
|
base_commit = repo.merge_base(local_commit, remote_commit)[0]
|
|
|
|
# Get the ahead and behind commits
|
|
ahead_commits = list(repo.iter_commits(f"{base_commit}..{local_commit}"))
|
|
behind_commits = list(repo.iter_commits(f"{base_commit}..{remote_commit}"))
|
|
|
|
# Combine the ahead and behind commits
|
|
diff_commits = ahead_commits + behind_commits
|
|
return diff_commits
|
|
|
|
def get_file_changes_from_commits(self, commit_list: List[git.Commit]) -> List[FileChange]:
|
|
file_changes = []
|
|
for commit in commit_list:
|
|
# print(commit)
|
|
diffs = commit.diff(self.hash_last_processed, create_patch=True)
|
|
# print(diffs)
|
|
for diff in diffs:
|
|
if diff.deleted_file:
|
|
change_type = ChangeType.DEL
|
|
elif diff.new_file:
|
|
change_type = ChangeType.NEW
|
|
else:
|
|
change_type = ChangeType.MOD
|
|
|
|
file_change = FileChange(
|
|
commit_hash=commit.hexsha,
|
|
commit_time=str(commit.committed_datetime),
|
|
path=diff.b_path if diff.new_file else diff.a_path,
|
|
change_type=change_type
|
|
)
|
|
file_changes.append(file_change)
|
|
return file_changes
|
|
|
|
def print_file_changes(self, file_changes: List[FileChange]):
|
|
for file_change in file_changes:
|
|
print("------------------------------------")
|
|
print(f"Commit Hash: {file_change.commit_hash}")
|
|
print(f"Commit Time: {file_change.commit_time}")
|
|
print(f"File Path: {file_change.path}")
|
|
print(f"Change Type: {file_change.change_type.value}")
|
|
print("------------------------------------")
|
|
|
|
def gitscan(path: str, cat: str) -> None:
|
|
"""Walk over directories to find Git repositories and check them."""
|
|
path = os.path.abspath(os.path.expanduser(path))
|
|
for root, dirs, files in os.walk(path):
|
|
if '.git' in dirs:
|
|
accountname = os.path.basename(os.path.dirname(root))
|
|
reponame = os.path.basename(root)
|
|
repo = Repo.load_from_redis(cat, accountname, reponame)
|
|
if repo is None:
|
|
repo = Repo(cat, accountname, reponame, root)
|
|
branch = repo.get_current_branch()
|
|
|
|
logger.debug(f"root: {root}")
|
|
logger.debug(f"accountname: {accountname}")
|
|
logger.debug(f"reponame: {reponame}")
|
|
logger.debug(f"branch: {branch}")
|
|
logger.debug(f"repo: {repo}")
|
|
|
|
repo.check_for_changes(branch)
|
|
dirs[:] = [] # Don't go deeper into subdirectories
|
|
else:
|
|
# Filter out any .git directories from further traversal
|
|
dirs[:] = [d for d in dirs if d != '.git']
|
|
|
|
def print_redis_client():
|
|
cursor = 0
|
|
while True:
|
|
cursor, keys = redis_client.scan(cursor)
|
|
for key in keys:
|
|
value = redis_client.get(key)
|
|
print(key)
|
|
print(value)
|
|
print()
|
|
if cursor == 0:
|
|
break
|
|
|
|
if __name__ == "__main__":
|
|
# print_redis_client()
|
|
mypath = "~/code/git.threefold.info/projectmycelium"
|
|
category = 'mycat'
|
|
gitscan(path=mypath, cat=category)
|
|
# print_redis_client() |