Leetcode SubCategorizer [part1 - DATA]
Unsupervised Leetcode Categorizer
1. Idea of the product & problem statement
Most of the time, people who wants to crack the coding interview, starts to do practice on platforms like LeetCode. And most of the MAANG companies asks from broad range of data structure & algorithms problems. If you don’t have a guide OR “curated list of problems”, you are going to be lost in the ocean of problems. Real case:
- There are more than 1000 problems under the Array tag, and if you gonna start from the Array, you will not be able to jump other topics less than 2 years of time period. But your interview is scheduled for the next month, and you have 1 month of time period for preparing. And during that 1 month you wanna to see too much different problems as much as possible, thus you gonna reduce the chance of “unseen” problems on interview.
Problem statement: Main groups of the algorithms & ds like Math, Array, Graphs, Dynamic Programming etc. have more than 300 problems under their domain.We need to divide such big groups into such a subgroups that, it will include as much as distinct problems which will represent as much as possible that domain. Thus, you will not waste your time by doing the same kind of problem repetitively. For example, on Leetcode platform under the Dynamic Programming category there are ~400 problems. It’s huge list. But DP on its own can be divided into following subgroups:
- Digit DP
- Bitmasking DP
- Matrix DP
- Tree DP
- Graph DP etc.
There is not ready dataset on the internet, so therefore we will go step by step all the details. This part’s plan is going to be like following:
- Deciding on Web Crawling or API methods
- Architecting data pipelines
- Developing data collector as per the step#2
1.1 Data Source Identification:
Leetcode platform doesn’t provide the official APIs. Therefore to discover the APIs I made “man in the middle” SSL bumping scenario. Installed SQUID proxy on my laptop localhost. After that, made my own Certification Authority for issuing custom client certificate and installed that CA into laptop as a “trusted CA”. Configured the iptables firewall(ssl redirection, port:443) and SQUID proxy as per SSL interception mode. And, redirected all the traffic through SQUID proxy. What happens in this case?
Here the server is our SQUID local proxy, and it accept all the SSL tunnel requests, decrypts and encrypts again with our CA and sends to the Leetcode servers. When the requests come from Leetcode, firewall sends them back into SQUID proxy and our proxy decrypts that tunnel and encrypt with CA and sends to our browser. Because we have encrypted the traffic with our CA certificate, that means we can decrypt = we can read the body of HTTPS messages = we can see the structure of the HTTP post method.
1.2 Data Collection & Pipelines:
Data collection pipelines should be like:
- scalable & flexible
- highly available
- fast
As we know, python applications are running sigle thread by default(because of the GIL - Global Interpreter Lock). Even multithreaded python app is not “true multithreaded”, at any moment of the CPU time single python thread runs on the sinngle core of the CPU. Because of that, context-switching of the thread brings another processing cost for the CPU. Therefore, I decided to use asyncIO asynchronous paradigms. Architecture of the application is as follows:
Generator: -> questionGenerator() async function retrieves all list of algorithmic problems from the leetcode servers. It’s async generator(coroutine), it consumes very less memory because of nature of the python-generators.
Producers: -> they could be many times (easily scalable), and they read from the generator, and writes to asyncQueue().
Async Queue -> it’s a queue, why async? it puts itself into awaiting mode when the size of the queue reaches to max_queue_size. It’s role is like channnel for producer-consumers.
Consumers: -> they could be many times (easily scalable), and they read from the async Queue, and processes body of the questions(sends GraphQL request to Leetcode). Also, against the DDOS attack, if the leetcode servers denies, it returns that question into the async queue and processes it later.
import asyncio
import aiohttp
from aiohttp.resolver import AsyncResolver
import ujson
import logging
import pickle
logging.basicConfig(filename='/akhalilli/data/appx.log',
encoding='utf-8',
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
graphql_query = '\n'.join(['query getQuestionDetails($titleSlug: String!) {',
' question(titleSlug: $titleSlug) {',
' content',
' title',
' titleSlug',
' likes',
' dislikes',
' similarQuestions',
' topicTags {\nname\nslug\n}',
' stats',
' hints',
' }',
'}'
])
questionmap = {}
qslugtoidmap = {}
qtitletoidmap = {}
slugs = []
async def questionGenerator():
logging.info(
f"generator started to run, this line of code will run once through lifecycle of the app")
listofproblems = []
# populate into async queue
logging.info(f"establishing TCP session, and HTTP over the TCP")
try:
async with aiohttp.ClientSession() as session:
async with session.get("https://leetcode.com/api/problems/algorithms") as resp:
logging.info(
"getting the response of problems list from leetcode servers")
listofproblems = ujson.loads(await resp.text())
except Exception as ex:
logging.error(f"exception occurred: {ex}")
# this part is going to be critical (it's generator, and consumes very less memory)
for qitem in listofproblems['stat_status_pairs']:
if not qitem['paid_only']:
yield qitem
async def producer(name, queue, qgenerator):
# getting & populating the problems list
# some web calls here
logging.info(
f"producer: {name}, getting question_items from the generator")
async for qitem in qgenerator:
qid = qitem['stat']['frontend_question_id']
logging.info(
f"producer: {name}, building the first processing of the question: {qid}")
questionmap[qid] = dict()
questionmap[qid]['qtitle'] = qitem['stat']['question__title']
questionmap[qid]['qslug'] = qitem['stat']['question__title_slug']
questionmap[qid]['isnew'] = qitem['stat']['is_new_question']
questionmap[qid]['accepted'] = qitem['stat']['total_acs']
questionmap[qid]['submitted'] = qitem['stat']['total_submitted']
questionmap[qid]['difficulty'] = qitem['difficulty']['level']
qslugtoidmap[questionmap[qid]['qslug']] = qid
qtitletoidmap[qitem['stat']['question__title']] = qid
# critical step is here, putting (qid, slug) pair into the consumer queue
logging.info(
f"producer: {name}, putting question: {qid} into the async queue")
await queue.put((qid, questionmap[qid]['qslug']))
async def consumer(name, queue):
resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
tcpconnector = aiohttp.connector.TCPConnector(
limit=10, limit_per_host=10, resolver=resolver)
# some async connection/session establishment here and keep it persistent
logging.info(
f"consumer: {name}, establishing TCP session, and HTTP over the TCP")
async with aiohttp.ClientSession() as session:
while True:
try:
print('session')
qid, questionslug = await queue.get()
logging.info(
f"consumer: {name}, starting to process body of question: {qid}")
variables = {"titleSlug": questionslug}
payload = {"getQuestionDetails": "questionData",
"query": graphql_query, "variables": variables}
# get question details and
logging.info(
f"consumer: {name}, getting body of the question: {qid}")
async with session.post('https://leetcode.com/graphql', json=payload) as resp:
print(resp.status)
questionBody = ujson.loads(await resp.text())
# processing the response
questionData = questionBody['data']['question']
questionmap[qid]['content'] = questionData['content']
questionmap[qid]['tags'] = [tagitem['name']
for tagitem in questionData['topicTags']]
questionmap[qid]['likes'] = questionData['likes']
questionmap[qid]['dislikes'] = questionData['dislikes']
questionmap[qid]['similarquestions'] = [qtitletoidmap[similaritem['title']] for similaritem in ujson.loads(
questionData['similarQuestions']) if similaritem['title'] in qtitletoidmap]
queue.task_done()
logging.info(
f"consumer: {name}, processed question: {qid} successfully")
except Exception as ex:
print(
f'exception: {ex}, qid:{qid}, qslug: {questionslug}, qsize: {queue.qsize()}')
queue.task_done()
await queue.put((qid, questionslug))
# push back to the queue
logging.error(
f"consumer: {name}, error occurred during getting body processing of the question: {qid}, returning back to the queue again")
# return back to the queue again
async def bigBang(queue_max_size):
queue = asyncio.Queue(maxsize=queue_max_size)
questionGen = questionGenerator()
consumer_tasks = [asyncio.create_task(
consumer(f"consumer_{i}", queue)) for i in range(2)]
producer_tasks = [asyncio.create_task(
producer(f"producer_{i}", queue, questionGen)) for i in range(1)]
await asyncio.gather(*producer_tasks)
await queue.join()
logging.info("Everything is finished, killing consumers..")
[consumer_task.cancel() for consumer_task in consumer_tasks]
logging.info("consumers stopped successfully!")
def serialize(filename, ds):
with open(filename, 'wb') as f:
pickle.dump(ds, f)
if __name__ == "__main__":
logging.info("START!")
loop = asyncio.get_event_loop()
loop.run_until_complete(bigBang(queue_max_size=2000))
serialize('/akhalilli/data/questionmap.pickle', questionmap)
serialize('/akhalilli/data/qslugtoidmap.pickle', qslugtoidmap)
serialize('/akhalilli/data/qtitletoidmap.pickle', qtitletoidmap)
logging.info("DONE!")
NEXT part is going to be NLP preprocessing.
Further Research
- SSL Bumping [SSL Interception]
- Async paradigm (Python Async Queue, asyncIO)