Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

from requests.adapters import BaseAdapter 

from requests.compat import urlparse, unquote 

from requests import Response, codes 

import errno 

import os 

import stat 

import locale 

import io 

 

from six import BytesIO 

 

class FileAdapter(BaseAdapter): 

def send(self, request, **kwargs): 

""" Wraps a file, described in request, in a Response object. 

 

:param request: The PreparedRequest` being "sent". 

:returns: a Response object containing the file 

""" 

 

# Check that the method makes sense. Only support GET 

if request.method not in ("GET", "HEAD"): 

raise ValueError("Invalid request method %s" % request.method) 

 

# Parse the URL 

url_parts = urlparse(request.url) 

 

# Reject URLs with a hostname component 

if url_parts.netloc and url_parts.netloc != "localhost": 

raise ValueError("file: URLs with hostname components are not permitted") 

 

resp = Response() 

 

# Open the file, translate certain errors into HTTP responses 

# Use urllib's unquote to translate percent escapes into whatever 

# they actually need to be 

try: 

# Split the path on / (the URL directory separator) and decode any 

# % escapes in the parts 

path_parts = [unquote(p) for p in url_parts.path.split('/')] 

 

# Strip out the leading empty parts created from the leading /'s 

while path_parts and not path_parts[0]: 

path_parts.pop(0) 

 

# If os.sep is in any of the parts, someone fed us some shenanigans. 

# Treat is like a missing file. 

if any(os.sep in p for p in path_parts): 

raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) 

 

# Look for a drive component. If one is present, store it separately 

# so that a directory separator can correctly be added to the real 

# path, and remove any empty path parts between the drive and the path. 

# Assume that a part ending with : or | (legacy) is a drive. 

if path_parts and (path_parts[0].endswith('|') or 

path_parts[0].endswith(':')): 

path_drive = path_parts.pop(0) 

if path_drive.endswith('|'): 

path_drive = path_drive[:-1] + ':' 

 

while path_parts and not path_parts[0]: 

path_parts.pop(0) 

else: 

path_drive = '' 

 

# Try to put the path back together 

# Join the drive back in, and stick os.sep in front of the path to 

# make it absolute. 

path = path_drive + os.sep + os.path.join(*path_parts) 

 

# Check if the drive assumptions above were correct. If path_drive 

# is set, and os.path.splitdrive does not return a drive, it wasn't 

# reall a drive. Put the path together again treating path_drive 

# as a normal path component. 

if path_drive and not os.path.splitdrive(path): 

path = os.sep + os.path.join(path_drive, *path_parts) 

 

# Use io.open since we need to add a release_conn method, and 

# methods can't be added to file objects in python 2. 

resp.raw = io.open(path, "rb") 

resp.raw.release_conn = resp.raw.close 

except IOError as e: 

if e.errno == errno.EACCES: 

resp.status_code = codes.forbidden 

elif e.errno == errno.ENOENT: 

resp.status_code = codes.not_found 

else: 

resp.status_code = codes.bad_request 

 

# Wrap the error message in a file-like object 

# The error message will be localized, try to convert the string 

# representation of the exception into a byte stream 

resp_str = str(e).encode(locale.getpreferredencoding(False)) 

resp.raw = BytesIO(resp_str) 

resp.headers['Content-Length'] = len(resp_str) 

 

# Add release_conn to the BytesIO object 

resp.raw.release_conn = resp.raw.close 

else: 

resp.status_code = codes.ok 

resp.url = request.url 

 

# If it's a regular file, set the Content-Length 

resp_stat = os.fstat(resp.raw.fileno()) 

if stat.S_ISREG(resp_stat.st_mode): 

resp.headers['Content-Length'] = resp_stat.st_size 

 

return resp 

 

def close(self): 

pass