git.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import os
  2. import sys
  3. from io import RawIOBase
  4. from dulwich import porcelain
  5. from dulwich.porcelain import get_remote_repo
  6. from dulwich.repo import Repo
  7. from dulwich.walk import Walker
  8. import requests
  9. def ro():
  10. os.system("/etc/init.d/S01mount_ro start")
  11. def rw():
  12. os.system("/etc/init.d/S01mount_ro stop")
  13. class NoneStream(RawIOBase):
  14. """Fallback if stdout or stderr are unavailable, does nothing."""
  15. def read(self, size=-1):
  16. return None
  17. def readall(self):
  18. return None
  19. def readinto(self, b):
  20. return None
  21. def write(self, b):
  22. return None
  23. def log(localRep):
  24. repo = Repo(localRep)
  25. walker = Walker(repo, sorted(repo.get_refs().values(), reverse=True))
  26. for entry in walker:
  27. commit = entry.commit
  28. print(f"commit {commit.id.decode()}")
  29. print(f"Author: {commit.author.decode()} <{commit.author.decode()}>")
  30. print(f"Date: {commit.author_time}")
  31. print()
  32. print(f" {commit.message.decode()}")
  33. print()
  34. def __log__(localRep):
  35. repo = Repo(localRep)
  36. walker = Walker(repo, sorted(repo.get_refs().values(), reverse=True))
  37. for entry in walker:
  38. commit = entry.commit
  39. return f"commit {commit.id.decode()}", f"Author: {commit.author.decode()} <{commit.author.decode()}>", \
  40. f"Date: {commit.author_time}", f" {commit.message.decode()}"
  41. def showChanges(localRep, outStream=sys.stdout):
  42. porcelain.show(localRep, outstream=outStream)
  43. def activeBranch(localRep):
  44. return porcelain.active_branch(localRep)
  45. def checkout(localRep, branch, force=True, progStream=None):
  46. r = Repo(localRep)
  47. if progStream is None:
  48. progStream = NoneStream()
  49. try:
  50. porcelain.checkout_branch(r, branch, force=force, outstream=progStream)
  51. except porcelain.CheckoutError as e:
  52. print("CheckoutError:", e)
  53. porcelain.fetch(localRep)
  54. porcelain.checkout_branch(r, branch, force=force, outstream=progStream)
  55. def pull(localRep, progStream=None, force=True, depth=3):
  56. if progStream is None:
  57. progStream = NoneStream()
  58. try:
  59. porcelain.pull(localRep, refspecs=porcelain.active_branch(localRep), force=force, outstream=progStream, depth=depth)
  60. except porcelain.CheckoutError as e:
  61. print("CheckoutError:", e)
  62. porcelain.fetch(localRep)
  63. porcelain.pull(localRep, refspecs=porcelain.active_branch(localRep), force=force, outstream=progStream, depth=depth)
  64. def reset(localRep):
  65. repo = Repo(localRep)
  66. head_ref = repo.head()
  67. head_commit = repo[head_ref]
  68. parent_commit = repo[head_commit.parents[0]]
  69. porcelain.reset(repo, 'hard', parent_commit.tree)
  70. def recovery(localRep):
  71. r = Repo(localRep)
  72. porcelain.reset(r, 'hard')
  73. def clone(source, target=None, depth: int = None):
  74. porcelain.clone(source, target, depth=depth)
  75. def isOnline(localRep):
  76. r = Repo(localRep)
  77. (remote_name, remote_location) = get_remote_repo(r, None)
  78. try:
  79. response = requests.get(remote_location)
  80. if response.status_code == 200:
  81. # print("Connection successful.")
  82. return True
  83. else:
  84. # print(f"Connection failed with status code: {response.status}")
  85. return False
  86. except Exception as e:
  87. # print(f"Connection failed: {e}")
  88. return False
  89. def swRemote(localRep, remote_url):
  90. r = Repo(localRep)
  91. try:
  92. porcelain.remote_remove(r,'origin')
  93. porcelain.remote_add(r,'origin',remote_url)
  94. except:
  95. porcelain.remote_add(r,'origin',remote_url)
  96. def swRemote_force(localRep, remote_url):
  97. swRemote(localRep, remote_url)
  98. porcelain.fetch(localRep)
  99. porcelain.pull(localRep, refspecs=porcelain.active_branch(localRep), force=True)
  100. if __name__ == '__main__':
  101. localrep = "/home/backup"
  102. log(localrep)