Python3 Script to Retrieve Entries using LDAP3
Below is a script I wrote to pull large data sets of Active Directory data using Python 3 and ldap3. I didn’t see any blogs that fulfilled my particular use case so I figured I would post a working version that I have. I have truncated all sensitive information out with <TRUNC> so be sure to replace that with the respective information (i.e. server/OU information). Most of the portions of the script are commented and hopefully contain enough information to be understandable and manipulated by people like me that were stuck with the examples given in the ldap3 documentation and various github/blog posts.
I would say the primary issue for me was that I had to enable paging for ldap3 to pull the results correctly and the documentation confused me about as to the best way to do this. I also have to pull from multiple OUs so I had to get that into the loop as well.
If you have any questions please feel free to comment on the blog or contact me via the contact page. Below is the code, but also if you just want to download and read in your own IDE please find it here:
This script uses Python 3 to pull a large (>150,000) Active Directory entries and export the results to JSON and CSV for Logstash and Splunk to read from.
You can also download from my Git repo: https://github.com/rglynn/SecurityProjects — you only need the Python3_LDAP_Pull directory.
Below is the raw code.
# This script uses Python 3 to pull a large (>150,000) Active Directory entries and export # the results to JSON and CSV for Logstash and Splunk to read from. from ldap3 import Server, Connection, ALL, NTLM import json import os import re import sys import csv import base64 from time import gmtime, strftime # Setting result locations, relative to a Unix operating system. combined_cat_ad = '/etc/addata/results/combined_cat_ad.yaml' combined_dep_ad = '/etc/addata/results/combined_dep_ad.yaml' combined_fn_ad = '/etc/addata/results/combined_fn_ad.yaml' combined_ad_json = '/etc/addata/results/ad_json.json' final_cat_ad = '/etc/addata/results/cat_ad.yaml' final_dep_ad = '/etc/addata/results/dep_ad.yaml' final_fn_ad = '/etc/addata/results/fn_ad.yaml' config = open('/etc/addata/config.txt', 'r').readlines() # Uses config file above to read a base64 encoded string and using particular lines it grabs a username and password and decodes. # Obviously nowhere near as good as actually encrypting the credentials, but it's the best I have for the time being. user = base64.b64decode(config[0]) pw = base64.b64decode(config[1]) # ###Enter AD User Info -- need to figure out a way to not hardcode in the future cusr = user.decode('utf-8') cpwd = pw.decode('utf-8') # Function to combine files -- used at the end after AD data is gathered from multiple DNs def combinefiles(inputfiles, outputfile): with open(outputfile, 'w') as outfile: for fname in inputfiles: with open(fname) as infile: for line in infile: outfile.write(line) # Function to deduplicate lines in a file -- used on the YAML files at the end def dedupefiles(inputfile, dedupedfile): lines = open(inputfile, 'r').readlines() lines_set = set(lines) out = open(dedupedfile, 'w') for line in lines_set: out.write(line) # Write log function to log progress to file and print if run manually writetolog = '/etc/addata/logs/ad_grabber.log' # Defines a function to write each activity to a log file for troubleshooting. def writelog(file_name, textstring): logtime = strftime("%Y-%m-%d %H:%M:%S", gmtime()) logfile = open(file_name, 'a+') print("%s %s" % (logtime, textstring)) logfile.write("%s ad_grabber: %s\n" % (logtime, textstring)) # Iterate through multiple user DNs when specified multidn = ['<TRUNC>', '<TRUNC>'] ###Line truncated to remove any specific code for dn in multidn: if dn == "<TRUNC>": ###Line truncated to remove any specific code dn_dir = "AllUsers" else: dn_dir = "Migration" orig_json = '/etc/addata/results/%s_orig_json.json' % dn_dir enhanced_json = '/etc/addata/results/%s_enhanced_data.json' % dn_dir csv_convert = '/etc/addata/results/%s_csvexport.csv' % dn_dir final_ad = '/etc/addata/results/%s_ad_json.json' % dn_dir ad_dept = '/etc/addata/results/%s_dep_ad.yaml' % dn_dir ad_cat = '/etc/addata/results/%s_cat_ad.yaml' % dn_dir ad_fn = '/etc/addata/results/%s_fn_ad.yaml' % dn_dir # Wiping old results try: os.remove(orig_json) except OSError: pass try: os.remove(enhanced_json) except OSError: pass try: os.remove(csv_convert) except OSError: pass try: os.remove(final_ad) except OSError: pass writelog(writetolog, 'Connecting to AD Server for DN: %s' % dn_dir) server = Server('<TRUNC>', use_ssl=True, get_info=ALL) ###Line truncated to remove any specific code conn = Connection(server, user=cusr, password=cpwd, auto_bind=True) basedn = dn writelog(writetolog, 'Connection Established.') writelog(writetolog, 'Gathering AD Objects...') # Search Criteria and Returning Attributes cookie = 'new_cookie' searchFilter = "(&(objectClass=user)(objectClass=person)(!(objectClass=computer))(!(sAMAccountName=$*))" \ "(!(sAMAccountName=#*))(!(sAMAccountName=a-*))(!(sAMAccountName=w-*)))" searchAttributes = ["sAMAccountName", "givenName", "sn", "mail", "l", "co", "telephoneNumber", "mobile", "businessUnitDesc", "manager", "whenCreated", "department", "title", "extensionAttribute1", "managerLevelDesc", "distinguishedName"] user_atrb_num = 0 # The with statement opens the connection and enables a paging pull of 800 results per pull. # I noticed with pulls larger than 1000, the results would time out at times. # The time outs appeared to be primarily due to the large values within some fields. with open(orig_json, 'w') as myfile: myfile.write('{"users": { ') while cookie: if cookie == "new_cookie": conn.search(basedn, searchFilter, attributes=searchAttributes, paged_size=5) for entry in conn.entries: user_atrb_num += 1 json_ldap = entry.entry_to_json() myfile.write('"%s":' % user_atrb_num) myfile.write('%s\n' % json_ldap) myfile.write(',') print(str(user_atrb_num) + " cookie objects processed.") try: cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie'] except KeyError: writelog(writetolog, 'Error: connection failed. Check connection user and password.') sys.exit() conn.search(basedn, searchFilter, attributes=searchAttributes, paged_size=800, paged_cookie=cookie) for entry in conn.entries: user_atrb_num += 1 json_ldap = entry.entry_to_json() myfile.write('"%s":' % user_atrb_num) myfile.write('%s\n' % json_ldap) myfile.write(',') print(str(user_atrb_num) + " objects processed.", end='\r') logstring = "%s total objects processed." % user_atrb_num writelog(writetolog, logstring) writelog(writetolog, 'Fixing JSON...') # Reformats JSON to acceptable format for ElasticSearch with open(orig_json, 'rb+') as myfile: myfile.seek(-1, os.SEEK_END) myfile.truncate() with open(orig_json, 'a') as myfile: myfile.write('}}') writelog(writetolog, 'Objects written.') config = json.loads(open(orig_json).read()) writelog(writetolog, 'Enhancing Data...') cnt_usermanager = 0 cnt_userfirst = 0 cnt_userlast = 0 # Loops through users and enhances JSON data with new fields that are calculated based on returned attributes # This is use case specific and is only kept here in case someone wants to do something similar. for obj in config["users"]: # Regex to extract Manager EID into its own field user_manager = str(config["users"][obj]["attributes"]["manager"]) p = re.compile('CN=([^,]+)') user_manager = p.search(user_manager) config["users"][obj]["attributes"]["managedBy"] = [] try: user_manager = '' + user_manager[1] + '' except TypeError: cnt_usermanager += 1 user_manager = 'Null' config["users"][obj]["attributes"]["managedBy"] = user_manager # End manager extraction # Calculate Category user_title = str(config["users"][obj]["attributes"]["title"]) r = re.compile('\[(.*)\]') user_title = r.search(user_title) if user_title[1] == "'Non-associate'": user_category = "Non-associate" else: user_category = "Associate" config["users"][obj]["attributes"]["Category"] = user_category # End Category Calculation # Full Name try: First = json.dumps(config["users"][obj]["attributes"]["givenName"][0]) except IndexError: cnt_userfirst += 1 First = 'Null' try: Last = json.dumps(config["users"][obj]["attributes"]["sn"][0]) except IndexError: cnt_userlast += 1 Last = 'Null' Full_Name = str(First) + " " + str(Last) Full_Name = Full_Name.replace('\"', '') config["users"][obj]["attributes"]["Full_Name"] = Full_Name # Logs the number of erroneous values writelog(writetolog, 'Found %s null values for manager. Replacing with null.' % cnt_usermanager) writelog(writetolog, 'Found %s null values for user first name. Replacing with null.' % cnt_userfirst) writelog(writetolog, 'Found %s null values for user last name. Replacing with null.' % cnt_userlast) writelog(writetolog, 'Saving enhanced file...') # Writing enhanced JSON to file with open(enhanced_json, 'w') as myfile2: json.dump(config, myfile2) writelog(writetolog, 'File saved.') # Converting to CSV for additional manipulations writelog(writetolog, 'Converting to CSV...') row = '' config2 = json.loads(open(enhanced_json).read()) with open(csv_convert, 'w', encoding='utf-8') as myfile3: for obj1 in config2["users"]["1"]["attributes"]: myfile3.write('%s,' % obj1) for uid in config2["users"]: print("Currently converting user %s to csv." % uid, end='\r') myfile3.write('\n') for attribute in config2["users"]["1"]["attributes"]: if attribute != "managedBy" and attribute != "Category" and attribute != "Full_Name": try: user_atrb = str(config2["users"][uid]["attributes"][attribute][0]) user_atrb = '"' + user_atrb + '"' except IndexError: user_atrb = '' else: try: user_atrb = str(config2["users"][uid]["attributes"][attribute]) user_atrb = '"' + user_atrb + '"' except IndexError: user_atrb = '' myfile3.write('%s,' % user_atrb) logstring = "Finished converting %s users to csv format.\n" % (uid) writelog(writetolog, logstring) # The original JSON output above is used to write to CSV but that JSON is not line delimited, # the below converts that file into line delimitation JSON so that Filebeats doesn't throw a fit writelog(writetolog, 'Converting JSON output into line delimitation...') csvfile = open(csv_convert, 'r') jsonfile = open(final_ad, 'w') deptfile = open(ad_dept, 'w') catfile = open(ad_cat, 'w') fnfile = open(ad_fn, 'w') fieldnames = ("businessUnitDesc", "co", "department", "distingushedName", "extensionAttribute1", "givenName", "l", "mail", "manager", "managerLevelDesc", "mobile", "sAMAccountName", "sn", "telephoneNumber", "title", "whenCreated", "managedBy", "Category", "Full_Name",) reader = csv.DictReader(csvfile, fieldnames) rowstep = 0 for row in reader: if rowstep != 0: json.dump(row, jsonfile) jsonfile.write('\n') rowstep += 1 writelog(writetolog, 'Finished.') # csvfile.close # Converting individual AD objects into a 1:1 format in YAML user:<<object>> -- # this is because Logstash cannot do 1:many translations writelog(writetolog, 'Converting to YAML...') csvfile = open(csv_convert, 'r') reader = csv.reader(csvfile, fieldnames) for row in reader: # ADEnrichlookup YAML Conversion dep = row[2] eid = row[11].lower() cat = row[17] fulln = row[18] # Department deptfile.write('"%s" : "%s"\n' % (eid, dep)) # Category catfile.write('"%s" : "%s"\n' % (eid, cat)) # Full Name fnfile.write('"%s" : "%s"\n' % (eid, fulln)) # Combine full AD JSON data writelog(writetolog, 'Combining JSON output...') filenames = ['/etc/addata/results/AllUsers_ad_json.json', '/etc/addata/results/Migration_ad_json.json'] combinefiles(filenames, combined_ad_json) # combine AD YAML files and dedupe writelog(writetolog, 'Combining AD Category YAML output...') filenames = ['/etc/addata/results/AllUsers_cat_ad.yaml', '/etc/addata/results/Migration_cat_ad.yaml'] combinefiles(filenames, combined_cat_ad) dedupefiles(combined_cat_ad, final_cat_ad) writelog(writetolog, 'Combining AD Department YAML output...') filenames = ['/etc/addata/results/AllUsers_dep_ad.yaml', '/etc/addata/results/Migration_dep_ad.yaml'] combinefiles(filenames, combined_dep_ad) dedupefiles(combined_dep_ad, final_dep_ad) writelog(writetolog, 'Combining AD Full Name YAML output...') filenames = ['/etc/addata/results/AllUsers_fn_ad.yaml', '/etc/addata/results/Migration_fn_ad.yaml'] combinefiles(filenames, combined_fn_ad) dedupefiles(combined_fn_ad, final_fn_ad) # cleanup writelog(writetolog, 'Cleaning up...') cleanfiles = ['/etc/addata/results/AllUsers_ad_json.json', '/etc/addata/results/Migration_ad_json.json', '/etc/addata/results/AllUsers_cat_ad.yaml', '/etc/addata/results/Migration_cat_ad.yaml', '/etc/addata/results/AllUsers_orig_json.json', '/etc/addata/results/Migration_orig_json.json', '/etc/addata/results/AllUsers_csvexport.csv', '/etc/addata/results/Migration_csvexport.csv', '/etc/addata/results/AllUsers_dep_ad.yaml', '/etc/addata/results/Migration_dep_ad.yaml', '/etc/addata/results/AllUsers_enhanced_data.json', '/etc/addata/results/Migration_enhanced_data.json', '/etc/addata/results/AllUsers_fn_ad.yaml', '/etc/addata/results/Migration_fn_ad.yaml', '/etc/addata/results/combined_cat_ad.yaml', '/etc/addata/results/combined_dep_ad.yaml', '/etc/addata/results/combined_fn_ad.yaml'] for file in cleanfiles: try: os.remove(file) except OSError: pass writelog(writetolog, 'Job done.')
Sign up for Programming and Cyber Security Tips and Scripts by Email:
[email-subscribers namefield=”YES” desc=”” group=”Public”]