forked from hochshi/DomainMap
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimportECODMapping.py
More file actions
106 lines (92 loc) · 4.54 KB
/
Copy pathimportECODMapping.py
File metadata and controls
106 lines (92 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
__author__ = 'lab'
import csv
import argparse
import requests
import json
import base64
import re
from string import Template
NEO4J_CREATE_TRAN_URL = "http://localhost:7474/db/data/transaction"
NEO4J_USER_PASS = 'neo4j:Sheshi6'
def parse_args():
parser = argparse.ArgumentParser(description="Import ECOD to pdb and uniprot mapping.")
# exclusive_group = parser.add_mutually_exclusive_group(required=True)
# batch_group = exclusive_group.add_argument_group("Batch processing", "description")
parser.add_argument("-src", "--source", help="Source file with SCOPE IDs", type=file, required=True)
# parser.add_argument("-csvdst", "--csv_destination", help="CSV mapping file destination", type=argparse.FileType('w'),
# required=True)
# parser.add_argument("-dst", "--destination", help="Mapping file destination", type=argparse.FileType('w'),
# required=True)
args = parser.parse_args()
return args
def main():
args = parse_args()
source = args.source
# csv_destination = args.csv_destination
# destination = args.destination
csv_reader = csv.DictReader(source, delimiter="\t")
mapDict = build_domMap(csv_reader);
createNEO4JMapping(mapDict)
def build_domMap(csv_reader):
dom2chainMap = {}
dom2unpMap = {}
pattern = re.compile('[0-9]+\-[0-9]+')
for row in csv_reader:
dom2chainMap[row['uid']] = dom2chainMap[row['uid']] if row['uid'] in dom2chainMap else {}
getPdbChainResMap(row, dom2chainMap)
dom2unpMap[row['uid']] = dom2unpMap[row['uid']] if row['uid'] in dom2unpMap else {}
getUNPResMap(row, dom2unpMap, pattern)
return {'chain': dom2chainMap, 'unp': dom2unpMap}
def getPdbChainResMap(row, dom2chainMap):
chainresarr = row['pdb_range'].split(',')
for chainres in chainresarr:
chain,res = chainres.split(':')
pdbchain = row['ecod_domain_id'][1:4]+chain
dom2chainMap[row['uid']][pdbchain] = dom2chainMap[row['uid']][pdbchain] if pdbchain in dom2chainMap[row['uid']] else []
resarr = res.rsplit('-',1)
if resarr not in dom2chainMap[row['uid']][pdbchain]:
dom2chainMap[row['uid']][pdbchain].append(resarr)
def getUNPResMap(row, dom2unpMap, pattern):
unp = row['unp_acc']
unp_res_arr = row['unp_range'].split(',')
for unp_res in unp_res_arr:
if pattern.match(unp_res) == None:
print(row['uid']+'\n')
dom2unpMap[row['uid']][unp] = dom2unpMap[row['uid']][unp] if unp in dom2unpMap[row['uid']] else []
resarr = unp_res.rsplit('-',1)
if resarr not in dom2unpMap[row['uid']][unp]:
dom2unpMap[row['uid']][unp].append(resarr)
def generateHeaders():
return {'Authorization': base64.b64encode(NEO4J_USER_PASS),
'Accept': 'application/json; charset=UTF-8',
'Content-Type': 'application/json'}
def createNEO4JMapping(mapdict):
trans_location = createNEO4JTran()
createMappings(mapdict, trans_location)
commitTran(trans_location)
def createNEO4JTran():
r = requests.post(NEO4J_CREATE_TRAN_URL, headers=generateHeaders())
return r.headers['Location']
def commitTran(trans_location):
r = requests.post(trans_location+'/commit', headers=generateHeaders())
def createMappings(mapdict, trans_location):
pdb_statment_template = 'MATCH (d:Domain {uid: $domain_uid), (c:PDBChain {id: $pdbchain_id}), ' \
' CREATE (d)-[:MATCHES {res: $pdb_res}]->(c)'
unp_statment_template = 'MATCH (d:Domain {uid: $domain_uid), (u:UniprotEntry {accession: $unp_acc)' \
' CREATE (d)-[:MATCHES {res: $unp_res}]->(u)'
pdb_template = Template(pdb_statment_template)
unp_template = Template(unp_statment_template)
for domain_uid in mapdict['chain'].keys():
statments_arr = []
for pdbchain_id in mapdict['chain'][domain_uid]:
pdb_statment = pdb_template.substitute(domain_uid=domain_uid, pdbchain_id=pdbchain_id,
pdb_res=mapdict['chain'][domain_uid][pdbchain_id])
statments_arr.append({'statement': pdb_statment})
for unp_acc in mapdict['unp'][domain_uid]:
unp_statment = unp_template.substitute(domain_uid=domain_uid, unp_acc=unp_acc,
unp_res=mapdict['unp'][domain_uid][unp_acc])
statments_arr.append({'statement': unp_statment})
statements_dict = {'statements': statments_arr}
r = requests.post(trans_location, headers=generateHeaders(),data=json.dumps(statements_dict))
if __name__ == "__main__":
main()