mirror of
https://git.mia.jetzt/scrubber
synced 2025-01-10 15:11:52 -07:00
155 lines
4.6 KiB
Python
155 lines
4.6 KiB
Python
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import psutil
|
|
import psycopg
|
|
|
|
from com import FilterAction, eval_config, parse_graph, progressbar
|
|
|
|
config = eval_config()
|
|
conn: psycopg.Connection = config["connect"]()
|
|
token: str = config["token"]
|
|
api: str = config["api"]
|
|
|
|
graph = parse_graph()
|
|
print("reading filterlist")
|
|
filtered = Path("filtered.list").read_text().strip().splitlines()
|
|
filtered = list(map(lambda line: line.split(' '), filtered))
|
|
|
|
print("building queue")
|
|
queue = []
|
|
|
|
def enqueue(note, action):
|
|
for reply in note["replies"]:
|
|
enqueue(graph[reply], action)
|
|
for quote in note["quotes"]:
|
|
enqueue(graph[quote], action)
|
|
if "self" in note["flags"]:
|
|
queue.append((note["id"], action))
|
|
|
|
for id, action in filtered:
|
|
enqueue(graph[id], FilterAction(action))
|
|
|
|
class CustomETA(progressbar.ETA):
|
|
def __init__(self, *args, **kwargs):
|
|
self.history = []
|
|
self.lastval = None
|
|
progressbar.ETA.__init__(self, *args, **kwargs)
|
|
|
|
def _calculate_eta(self, progress, data, value, elapsed):
|
|
if self.lastval != value:
|
|
self.history = [*self.history[-9:], elapsed.total_seconds()]
|
|
self.lastval = value
|
|
per_item = (self.history[-1] - self.history[0]) / len(self.history)
|
|
remaining = (progress.max_value - value) * per_item
|
|
spent = elapsed.total_seconds() - self.history[-1]
|
|
return max(remaining - spent, 0)
|
|
|
|
pb = progressbar.ProgressBar(
|
|
0,
|
|
len(queue),
|
|
widgets=[
|
|
progressbar.Variable("message", format="{formatted_value}"),
|
|
" ",
|
|
progressbar.Percentage(),
|
|
" ",
|
|
progressbar.Bar(),
|
|
" ",
|
|
progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"),
|
|
" ",
|
|
CustomETA(),
|
|
],
|
|
variables={"status": "work"}
|
|
)
|
|
pb.update(0) # force initial display
|
|
client = httpx.Client(timeout=60)
|
|
seeking = False
|
|
last_req = 0
|
|
|
|
for note, action in queue:
|
|
|
|
# seek through queue
|
|
# helps prevent rate limits on resumed deletions
|
|
if seeking:
|
|
while True:
|
|
resp = client.post(f"{api}/notes/show", json={
|
|
"i": token,
|
|
"noteId": note,
|
|
})
|
|
if resp.status_code == 502:
|
|
pb.update(message="down")
|
|
time.sleep(1)
|
|
continue
|
|
break
|
|
if resp.status_code == 404:
|
|
pb.increment(message="seeking")
|
|
continue
|
|
seeking = False
|
|
|
|
# wait for queue to empty
|
|
while True:
|
|
resp = client.post(f"{api}/admin/queue/stats", json={"i": token})
|
|
if resp.status_code == 502:
|
|
pb.update(message="down")
|
|
time.sleep(1)
|
|
continue
|
|
Path('queue-stats.dump').write_text(f"status:{resp.status_code}\nbody:\n{resp.text}")
|
|
deliver_waiting = resp.json()["deliver"]["waiting"]
|
|
obliterate_waiting = resp.json()["obliterate"]["waiting"]
|
|
if deliver_waiting < 100 and obliterate_waiting < 50000:
|
|
break
|
|
pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting})")
|
|
time.sleep(10)
|
|
|
|
# make sure there's enough memory for new jobs
|
|
while True:
|
|
vmem = psutil.virtual_memory()
|
|
if vmem.available > (512 * 1024 * 1024):
|
|
break
|
|
pb.update(message="memory")
|
|
time.sleep(10)
|
|
|
|
# prevent api rate limiting
|
|
req_delay = time.time() - last_req
|
|
if req_delay < 30:
|
|
pb.update(message="delaying")
|
|
time.sleep(req_delay)
|
|
|
|
# queue new deletions
|
|
err = 0
|
|
while True:
|
|
resp = client.post(f"{api}/notes/delete", json={
|
|
"i": token,
|
|
"noteId": note,
|
|
"obliterate": action == FilterAction.Obliterate,
|
|
})
|
|
if resp.status_code == 429:
|
|
pb.update(status="limit")
|
|
time.sleep(1)
|
|
continue
|
|
elif resp.status_code == 502:
|
|
pb.update(status="down")
|
|
time.sleep(1)
|
|
continue
|
|
elif resp.status_code >= 400:
|
|
body = resp.json()
|
|
if body["error"]["code"] == "NO_SUCH_NOTE":
|
|
pb.increment(message="seeking")
|
|
seeking = True
|
|
break
|
|
elif body["error"]["code"] == "QUEUE_FULL":
|
|
print("\nobliterate queue overflowed, exiting to save server")
|
|
break
|
|
err += 1
|
|
if err > 3:
|
|
raise Exception(f"{body['error']['code']}: {body['error']['message']}")
|
|
sys.stdout.write("\r")
|
|
print(f"err {body['error']['code']} {body['error']['message']} ")
|
|
time.sleep(30)
|
|
pb.increment(message="deleting")
|
|
last_req = time.time()
|
|
break
|
|
|
|
pb.finish()
|