Python3 Script to Retrieve Entries using LDAP3
Below is a script I wrote to pull large data sets of Active Directory data using Python 3 and ldap3. I didn’t see any blogs that fulfilled my particular use case so I figured I would post a working version that I have. I have truncated all sensitive information out with <TRUNC> so be sure to replace that with the respective information (i.e. server/OU information). Most of the portions of the script are commented and hopefully contain enough information to be understandable and manipulated by people like me that were stuck with the examples given in the ldap3 documentation and various github/blog posts.
I would say the primary issue for me was that I had to enable paging for ldap3 to pull the results correctly and the documentation confused me about as to the best way to do this. I also have to pull from multiple OUs so I had to get that into the loop as well.
If you have any questions please feel free to comment on the blog or contact me via the contact page. Below is the code, but also if you just want to download and read in your own IDE please find it here:
This script uses Python 3 to pull a large (>150,000) Active Directory entries and export the results to JSON and CSV for Logstash and Splunk to read from.
You can also download from my Git repo: https://github.com/rglynn/SecurityProjects — you only need the Python3_LDAP_Pull directory.
Below is the raw code.
# This script uses Python 3 to pull a large (>150,000) Active Directory entries and export
# the results to JSON and CSV for Logstash and Splunk to read from.
from ldap3 import Server, Connection, ALL, NTLM
import json
import os
import re
import sys
import csv
import base64
from time import gmtime, strftime
# Setting result locations, relative to a Unix operating system.
combined_cat_ad = '/etc/addata/results/combined_cat_ad.yaml'
combined_dep_ad = '/etc/addata/results/combined_dep_ad.yaml'
combined_fn_ad = '/etc/addata/results/combined_fn_ad.yaml'
combined_ad_json = '/etc/addata/results/ad_json.json'
final_cat_ad = '/etc/addata/results/cat_ad.yaml'
final_dep_ad = '/etc/addata/results/dep_ad.yaml'
final_fn_ad = '/etc/addata/results/fn_ad.yaml'
config = open('/etc/addata/config.txt', 'r').readlines()
# Uses config file above to read a base64 encoded string and using particular lines it grabs a username and password and decodes.
# Obviously nowhere near as good as actually encrypting the credentials, but it's the best I have for the time being.
user = base64.b64decode(config[0])
pw = base64.b64decode(config[1])
# ###Enter AD User Info -- need to figure out a way to not hardcode in the future
cusr = user.decode('utf-8')
cpwd = pw.decode('utf-8')
# Function to combine files -- used at the end after AD data is gathered from multiple DNs
def combinefiles(inputfiles, outputfile):
with open(outputfile, 'w') as outfile:
for fname in inputfiles:
with open(fname) as infile:
for line in infile:
outfile.write(line)
# Function to deduplicate lines in a file -- used on the YAML files at the end
def dedupefiles(inputfile, dedupedfile):
lines = open(inputfile, 'r').readlines()
lines_set = set(lines)
out = open(dedupedfile, 'w')
for line in lines_set:
out.write(line)
# Write log function to log progress to file and print if run manually
writetolog = '/etc/addata/logs/ad_grabber.log'
# Defines a function to write each activity to a log file for troubleshooting.
def writelog(file_name, textstring):
logtime = strftime("%Y-%m-%d %H:%M:%S", gmtime())
logfile = open(file_name, 'a+')
print("%s %s" % (logtime, textstring))
logfile.write("%s ad_grabber: %s\n" % (logtime, textstring))
# Iterate through multiple user DNs when specified
multidn = ['<TRUNC>', '<TRUNC>'] ###Line truncated to remove any specific code
for dn in multidn:
if dn == "<TRUNC>": ###Line truncated to remove any specific code
dn_dir = "AllUsers"
else:
dn_dir = "Migration"
orig_json = '/etc/addata/results/%s_orig_json.json' % dn_dir
enhanced_json = '/etc/addata/results/%s_enhanced_data.json' % dn_dir
csv_convert = '/etc/addata/results/%s_csvexport.csv' % dn_dir
final_ad = '/etc/addata/results/%s_ad_json.json' % dn_dir
ad_dept = '/etc/addata/results/%s_dep_ad.yaml' % dn_dir
ad_cat = '/etc/addata/results/%s_cat_ad.yaml' % dn_dir
ad_fn = '/etc/addata/results/%s_fn_ad.yaml' % dn_dir
# Wiping old results
try:
os.remove(orig_json)
except OSError:
pass
try:
os.remove(enhanced_json)
except OSError:
pass
try:
os.remove(csv_convert)
except OSError:
pass
try:
os.remove(final_ad)
except OSError:
pass
writelog(writetolog, 'Connecting to AD Server for DN: %s' % dn_dir)
server = Server('<TRUNC>', use_ssl=True, get_info=ALL) ###Line truncated to remove any specific code
conn = Connection(server, user=cusr, password=cpwd, auto_bind=True)
basedn = dn
writelog(writetolog, 'Connection Established.')
writelog(writetolog, 'Gathering AD Objects...')
# Search Criteria and Returning Attributes
cookie = 'new_cookie'
searchFilter = "(&(objectClass=user)(objectClass=person)(!(objectClass=computer))(!(sAMAccountName=$*))" \
"(!(sAMAccountName=#*))(!(sAMAccountName=a-*))(!(sAMAccountName=w-*)))"
searchAttributes = ["sAMAccountName", "givenName", "sn", "mail", "l", "co", "telephoneNumber", "mobile",
"businessUnitDesc", "manager", "whenCreated", "department", "title", "extensionAttribute1",
"managerLevelDesc", "distinguishedName"]
user_atrb_num = 0
# The with statement opens the connection and enables a paging pull of 800 results per pull.
# I noticed with pulls larger than 1000, the results would time out at times.
# The time outs appeared to be primarily due to the large values within some fields.
with open(orig_json, 'w') as myfile:
myfile.write('{"users": { ')
while cookie:
if cookie == "new_cookie":
conn.search(basedn, searchFilter, attributes=searchAttributes, paged_size=5)
for entry in conn.entries:
user_atrb_num += 1
json_ldap = entry.entry_to_json()
myfile.write('"%s":' % user_atrb_num)
myfile.write('%s\n' % json_ldap)
myfile.write(',')
print(str(user_atrb_num) + " cookie objects processed.")
try:
cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
except KeyError:
writelog(writetolog, 'Error: connection failed. Check connection user and password.')
sys.exit()
conn.search(basedn, searchFilter, attributes=searchAttributes, paged_size=800, paged_cookie=cookie)
for entry in conn.entries:
user_atrb_num += 1
json_ldap = entry.entry_to_json()
myfile.write('"%s":' % user_atrb_num)
myfile.write('%s\n' % json_ldap)
myfile.write(',')
print(str(user_atrb_num) + " objects processed.", end='\r')
logstring = "%s total objects processed." % user_atrb_num
writelog(writetolog, logstring)
writelog(writetolog, 'Fixing JSON...')
# Reformats JSON to acceptable format for ElasticSearch
with open(orig_json, 'rb+') as myfile:
myfile.seek(-1, os.SEEK_END)
myfile.truncate()
with open(orig_json, 'a') as myfile:
myfile.write('}}')
writelog(writetolog, 'Objects written.')
config = json.loads(open(orig_json).read())
writelog(writetolog, 'Enhancing Data...')
cnt_usermanager = 0
cnt_userfirst = 0
cnt_userlast = 0
# Loops through users and enhances JSON data with new fields that are calculated based on returned attributes
# This is use case specific and is only kept here in case someone wants to do something similar.
for obj in config["users"]:
# Regex to extract Manager EID into its own field
user_manager = str(config["users"][obj]["attributes"]["manager"])
p = re.compile('CN=([^,]+)')
user_manager = p.search(user_manager)
config["users"][obj]["attributes"]["managedBy"] = []
try:
user_manager = '' + user_manager[1] + ''
except TypeError:
cnt_usermanager += 1
user_manager = 'Null'
config["users"][obj]["attributes"]["managedBy"] = user_manager
# End manager extraction
# Calculate Category
user_title = str(config["users"][obj]["attributes"]["title"])
r = re.compile('\[(.*)\]')
user_title = r.search(user_title)
if user_title[1] == "'Non-associate'":
user_category = "Non-associate"
else:
user_category = "Associate"
config["users"][obj]["attributes"]["Category"] = user_category
# End Category Calculation
# Full Name
try:
First = json.dumps(config["users"][obj]["attributes"]["givenName"][0])
except IndexError:
cnt_userfirst += 1
First = 'Null'
try:
Last = json.dumps(config["users"][obj]["attributes"]["sn"][0])
except IndexError:
cnt_userlast += 1
Last = 'Null'
Full_Name = str(First) + " " + str(Last)
Full_Name = Full_Name.replace('\"', '')
config["users"][obj]["attributes"]["Full_Name"] = Full_Name
# Logs the number of erroneous values
writelog(writetolog, 'Found %s null values for manager. Replacing with null.' % cnt_usermanager)
writelog(writetolog, 'Found %s null values for user first name. Replacing with null.' % cnt_userfirst)
writelog(writetolog, 'Found %s null values for user last name. Replacing with null.' % cnt_userlast)
writelog(writetolog, 'Saving enhanced file...')
# Writing enhanced JSON to file
with open(enhanced_json, 'w') as myfile2:
json.dump(config, myfile2)
writelog(writetolog, 'File saved.')
# Converting to CSV for additional manipulations
writelog(writetolog, 'Converting to CSV...')
row = ''
config2 = json.loads(open(enhanced_json).read())
with open(csv_convert, 'w', encoding='utf-8') as myfile3:
for obj1 in config2["users"]["1"]["attributes"]:
myfile3.write('%s,' % obj1)
for uid in config2["users"]:
print("Currently converting user %s to csv." % uid, end='\r')
myfile3.write('\n')
for attribute in config2["users"]["1"]["attributes"]:
if attribute != "managedBy" and attribute != "Category" and attribute != "Full_Name":
try:
user_atrb = str(config2["users"][uid]["attributes"][attribute][0])
user_atrb = '"' + user_atrb + '"'
except IndexError:
user_atrb = ''
else:
try:
user_atrb = str(config2["users"][uid]["attributes"][attribute])
user_atrb = '"' + user_atrb + '"'
except IndexError:
user_atrb = ''
myfile3.write('%s,' % user_atrb)
logstring = "Finished converting %s users to csv format.\n" % (uid)
writelog(writetolog, logstring)
# The original JSON output above is used to write to CSV but that JSON is not line delimited,
# the below converts that file into line delimitation JSON so that Filebeats doesn't throw a fit
writelog(writetolog, 'Converting JSON output into line delimitation...')
csvfile = open(csv_convert, 'r')
jsonfile = open(final_ad, 'w')
deptfile = open(ad_dept, 'w')
catfile = open(ad_cat, 'w')
fnfile = open(ad_fn, 'w')
fieldnames = ("businessUnitDesc", "co", "department", "distingushedName", "extensionAttribute1",
"givenName", "l", "mail", "manager", "managerLevelDesc", "mobile", "sAMAccountName",
"sn", "telephoneNumber", "title", "whenCreated", "managedBy", "Category", "Full_Name",)
reader = csv.DictReader(csvfile, fieldnames)
rowstep = 0
for row in reader:
if rowstep != 0:
json.dump(row, jsonfile)
jsonfile.write('\n')
rowstep += 1
writelog(writetolog, 'Finished.')
# csvfile.close
# Converting individual AD objects into a 1:1 format in YAML user:<<object>> --
# this is because Logstash cannot do 1:many translations
writelog(writetolog, 'Converting to YAML...')
csvfile = open(csv_convert, 'r')
reader = csv.reader(csvfile, fieldnames)
for row in reader:
# ADEnrichlookup YAML Conversion
dep = row[2]
eid = row[11].lower()
cat = row[17]
fulln = row[18]
# Department
deptfile.write('"%s" : "%s"\n' % (eid, dep))
# Category
catfile.write('"%s" : "%s"\n' % (eid, cat))
# Full Name
fnfile.write('"%s" : "%s"\n' % (eid, fulln))
# Combine full AD JSON data
writelog(writetolog, 'Combining JSON output...')
filenames = ['/etc/addata/results/AllUsers_ad_json.json', '/etc/addata/results/Migration_ad_json.json']
combinefiles(filenames, combined_ad_json)
# combine AD YAML files and dedupe
writelog(writetolog, 'Combining AD Category YAML output...')
filenames = ['/etc/addata/results/AllUsers_cat_ad.yaml', '/etc/addata/results/Migration_cat_ad.yaml']
combinefiles(filenames, combined_cat_ad)
dedupefiles(combined_cat_ad, final_cat_ad)
writelog(writetolog, 'Combining AD Department YAML output...')
filenames = ['/etc/addata/results/AllUsers_dep_ad.yaml', '/etc/addata/results/Migration_dep_ad.yaml']
combinefiles(filenames, combined_dep_ad)
dedupefiles(combined_dep_ad, final_dep_ad)
writelog(writetolog, 'Combining AD Full Name YAML output...')
filenames = ['/etc/addata/results/AllUsers_fn_ad.yaml', '/etc/addata/results/Migration_fn_ad.yaml']
combinefiles(filenames, combined_fn_ad)
dedupefiles(combined_fn_ad, final_fn_ad)
# cleanup
writelog(writetolog, 'Cleaning up...')
cleanfiles = ['/etc/addata/results/AllUsers_ad_json.json', '/etc/addata/results/Migration_ad_json.json',
'/etc/addata/results/AllUsers_cat_ad.yaml', '/etc/addata/results/Migration_cat_ad.yaml',
'/etc/addata/results/AllUsers_orig_json.json', '/etc/addata/results/Migration_orig_json.json',
'/etc/addata/results/AllUsers_csvexport.csv', '/etc/addata/results/Migration_csvexport.csv',
'/etc/addata/results/AllUsers_dep_ad.yaml', '/etc/addata/results/Migration_dep_ad.yaml',
'/etc/addata/results/AllUsers_enhanced_data.json', '/etc/addata/results/Migration_enhanced_data.json',
'/etc/addata/results/AllUsers_fn_ad.yaml', '/etc/addata/results/Migration_fn_ad.yaml',
'/etc/addata/results/combined_cat_ad.yaml', '/etc/addata/results/combined_dep_ad.yaml',
'/etc/addata/results/combined_fn_ad.yaml']
for file in cleanfiles:
try:
os.remove(file)
except OSError:
pass
writelog(writetolog, 'Job done.')
Sign up for Programming and Cyber Security Tips and Scripts by Email:
[email-subscribers namefield=”YES” desc=”” group=”Public”]