Desarrollo de la ETL Pipeline
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch_dsl import Search, Q
from requests_aws4auth import AWS4Auth
import pandas as pd
def getData(index, fields, query=None, endpoint = endpoint):
# index es el índice al que vamos a llamar.
# fields, los campos que queremos extraer.
# query es el filtro que podemos aplicar. Por defecto, no aplicamos filtro.
# endpoint es la api a través de la cual nos conectamos a Elasticsearch.
access_key = accessKey
secret_key = secretKey
auth=AWS4Auth(access_key, secret_key, region, service)
# En Homyspace, Elasticsearch se utiliza como servicio gestionado a través
# de Amazon Web Services, de ahí que necesitemos una autentificación adaptada.
# Region es la región que vamos a utilizar de AWS, y service el servicio,
# en este caso Elasticsearch.
es = Elasticsearch(
hosts = [{"host": endpoint, "port": port}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
if not query:
indexQuery = Search(using=es, index=index).source(includes=fields)
data = pd.DataFrame((d.to_dict() for d in indexQuery.scan()))
else:
indexQuery = Search(using=es, index=index).source(includes=fields)\
.query("bool", filter = query)
data = pd.DataFrame((d.to_dict() for d in indexQuery.scan()))
# Por último, transforma la respuesta en una matriz de datos que pueda ser utilizada
# en Python, y la devuelve
return data
def getPropertiesData():
s1 = Search(using=es, index="property").source(includes=
["address.locality",
"address.administrativeAreaLevel2",
"address.country",
"id",
"address.geoLocation"]
)
propertiesData = pd.DataFrame((d.to_dict() for d in s1.scan()))
propertiesData["country"] = propertiesData.address.apply(lambda x: x.get("country")\
if type(x) is not float else "")
propertiesData["province"] = propertiesData.address.apply(lambda x: x.get\
("administrativeAreaLevel2") if type(x) is not float else "")
propertiesData = propertiesData[propertiesData.country == "España"]
propertiesData(columns={"id": "propertyId"}, inplace = True)
propertiesData["lon"] = propertiesData.address.apply(lambda x: x.get("geoLocation")\
.get("lon") if type(x) is not float else "")
propertiesData["lat"] = propertiesData.address.apply(lambda x: x.get("geoLocation")\
.get("lat") if type(x) is not float else "")
propertiesData["propertyCity"] = propertiesData.address.apply(lambda x: x.get("locality")\
if type(x) is not float else "")
propertiesData = propertiesData.replace("Á", "A").replace("á", "a").replace("À", "A")\
.replace("à", "a").replace("É", "E").replace("é", "e").replace("È", "E").replace("è", "e")\
.replace("Í", "I").replace("í", "i").replace("Ì", "I").replace("ì", "i").replace("Ó", "O")\
.replace("ó", "o").replace("Ò", "O").replace("ò", "o").replace("Ú","U").replace("ú", "u")\
.replace("Ù", "U").replace("ù", "u").replace("Ñ", "NY").replace("ñ", "ny").replace("Ü", "U")\
.replace("ü", "u")
propertiesData = propertiesData.drop(["address", "country"], axis=1)
geometry = propertiesData.apply(lambda x: Point(x.lon, x.lat), axis = 1)
propertiesData = gpd.GeoDataFrame(propertiesData, geometry = geometry)
return propertiesData
def getOpportunitiesData():
s1 = Search(using=es, index="deal*").source(includes=
["address.locality",
"address.administrativeAreaLevel2",
"rentalRequestStage",
"address.country",
"id",
"address.geoLocation"]
)
opportunitiesData = pd.DataFrame((d.to_dict() for d in s1.scan()))
opportunitiesData["country"] = opportunitiesData.address.apply(lambda x: x.get("country")\
if type(x) is not float else "")
opportunitiesData["province"] = opportunitiesData.address.apply(lambda x: x.get\
("administrativeAreaLevel2") if type(x) is not float else "")
opportunitiesData = opportunitiesData[opportunitiesData.country == "España"]
opportunitiesData["lon"] = opportunitiesData.address.apply(lambda x: x.get("geoLocation")\
.get("lon") if type(x) is not float else "")
opportunitiesData["lat"] = opportunitiesData.address.apply(lambda x: x.get("geoLocation")\
.get("lat") if type(x) is not float else "")
opportunitiesData = opportunitiesData.replace("Á", "A").replace("á", "a").replace("À", "A")\
.replace("à", "a").replace("É", "E").replace("é", "e").replace("È", "E").replace("è", "e")\
.replace("Í", "I").replace("í", "i").replace("Ì", "I").replace("ì", "i").replace("Ó", "O")\
.replace("ó", "o").replace("Ò", "O").replace("ò", "o").replace("Ú","U").replace("ú", "u")\
.replace("Ù", "U").replace("ù", "u").replace("Ñ", "NY").replace("ñ", "ny").replace("Ü", "U")\
.replace("ü", "u")
opportunitiesData = opportunitiesData.drop(["address", "country"], axis=1)
geometry = opportunitiesData.apply(lambda x: Point(x.lon, x.lat), axis = 1)
opportunitiesData = gpd.GeoDataFrame(opportunitiesData, geometry = geometry)
return opportunitiesData
def getProposalsData():
statusQuery = Q("match", proposalStatus = "OK")
acceptedQuery = Q("match", proposalStage = "ACCEPTED")
rejectedQuery = Q("match", proposalStage = "REJECTED")
completeQuery = statusQuery & (acceptedQuery | rejectedQuery)
s1 = Search(using=es, index="proposal*").source(includes=
["address.locality",
"address.administrativeAreaLevel2",
"address.postalCode",
"dateCheckIn",
"dateCheckOut",
"monthlyPriceDossier.amount",
"property.beds",
"proposalStage",
"proposalStatus",
"property.id",
"creationDate",
"address.geoLocation"]
).query("bool", filter = completeQuery)
proposalsData = pd.DataFrame((d.to_dict() for d in s1.scan()))
proposalsData = proposalsData[(proposalsData.dateCheckIn > 0) &\
(proposalsData.dateCheckOut > 0)]
proposalsData["propertyId"] = proposalsData.property.apply(lambda x: x.get("id")\
if type(x) is not float else "")
proposalsData["propertyCity"] = proposalsData.address.apply(lambda x: x.get("locality")\
if type(x) is not float else "")
proposalsData["province"] = proposalsData.address.apply(lambda x: x.get\
("administrativeAreaLevel2") if type(x) is not float else "")
proposalsData["monthlyPrice"] = proposalsData.monthlyPriceDossier.apply(lambda x: x.get\
("amount") if type(x) is not float else "")
proposalsData["dateCheckIn"] = pd.to_datetime(proposalsData["dateCheckIn"], unit = "ms")
proposalsData["dateCheckOut"] = pd.to_datetime(proposalsData["dateCheckOut"], unit = "ms")
proposalsData["beds"] = proposalsData.property.apply(lambda x: x.get("beds")\
if type(x) is not float else "")
proposalsData["duration"] = (proposalsData["dateCheckOut"] - proposalsData["dateCheckIn"])\
.dt.days
proposalsData = proposalsData.drop(
["property",
"proposalStatus",
"address",
"monthlyPriceDossier",
"creationDate",
"dateCheckIn",
"dateCheckOut"], axis=1
)
proposalsData = proposalsData.replace("Á", "A").replace("á", "a").replace("À", "A")\
.replace("à", "a").replace("É", "E").replace("é", "e").replace("È", "E").replace("è", "e")\
.replace("Í", "I").replace("í", "i").replace("Ì", "I").replace("ì", "i").replace("Ó", "O")\
.replace("ó", "o").replace("Ò", "O").replace("ò", "o").replace("Ú","U").replace("ú", "u")\
.replace("Ù", "U").replace("ù", "u").replace("Ñ", "NY").replace("ñ", "ny").replace("Ü", "U")\
.replace("ü", "u")
return proposalsData
def getDistrictsFromDBX(zip_file_url):
zip_file_url = zip_file_url
r = requests.get(zip_file_url)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()
distritos = gpd.GeoDataFrame.from_file("coordenadasDistritos.shp")
distritos = distritos.to_crs(epsg = 4326)
# distritos = distritos.replace("Araba/Álava", "Álava")\
# .replace("Balears, Illes", "Illes Balears")\
# .replace("Castellón/Castelló", "Castellón")\
# .replace("Coruña, A", "A Coruña")\
# .replace("Rioja, La", "La Rioja")\
# .replace("Palmas, Las", "Las Palmas")\
# .replace("Valencia/Valéncia", "Valencia")
return distritos
def getPropertiesFromDBX(zip_file_url):
zip_file_url = zip_file_url
r = requests.get(zip_file_url)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()
propertiesGeometry = gpd.GeoDataFrame.from_file("properties.shp")
return propertiesGeometry
def getOpportunitiesFromDBX(url):
opportunitiesDistritos = pd.read_csv(url)
opportunitiesDistritos.drop("Unnamed: 0", axis = 1, inplace = True)
return opportunitiesDistritos
def assignDistrictsToProperties(propertiesData):
propertiesGeometry = getPropertiesFromDBX()
distritos = getDistrictsFromDBX()
propertiesData = propertiesData[~propertiesData["propertyId"]\
.isin(propertiesGeometry["propertyId"])].reset_index()
properties = propertiesData.copy()
properties["distrito"] = np.nan
properties["municipio"] = np.nan
properties["polygon"] = np.nan
length = properties.shape[0]
totales = 0
sumat = 0
for i, inmueble in properties.iterrows():
provincia = inmueble.province
point = inmueble.geometry
distritosProvincia = distritos[distritos.NPRO == provincia]
for j, distrito in distritosProvincia.iterrows():
geom = distrito.geometry
cudis = distrito.CUDIS
nmun = distrito.NMUN
if geom.contains(point):
properties["distrito"].loc[i] = cudis
properties["municipio"].loc[i] = nmun
properties["polygon"].loc[i] = geom
break
totales += 1
if totales == 100:
sumat += totales
totales = 0
print(str(sumat) + " completados de " + str(length))
properties.drop(columns=["geometry", "index"], inplace = True)
properties.rename(columns={"polygon": "geometry"}, inplace = True)
properties = gpd.GeoDataFrame(properties, geometry = properties.geometry)
properties = properties[["propertyId", "municipio", "distrito", "geometry"]]
properties = pd.concat([propertiesGeometry, properties], sort = True)
properties = properties.drop_duplicates(subset = "propertyId")
return properties
def assignDistrictsToOpportunities(opportunitiesData):
opportunitiesDistritos = getOpportunitiesFromDBX(url)
opportunitiesData = opportunitiesData[~opportunitiesData["id"]\
.isin(opportunitiesDistritos["id"])].reset_index()
opportunitiesData.drop(columns=["index"], inplace=True)
distritos = gpd.GeoDataFrame.from_file("coordenadasDistritos.shp")
distritos = distritos.to_crs(epsg = 4326)
# distritos = distritos.replace("Araba/Álava", "Álava")\
# .replace('Balears, Illes', "Illes Balears")\
# .replace('Castellón/Castelló', "Castellón")\
# .replace('Coruña, A', "A Coruña")\
# .replace('Rioja, La', "La Rioja")\
# .replace('Palmas, Las', "Las Palmas")\
# .replace('Valencia/Valéncia', "Valencia")
opportunities = opportunitiesData.copy()
opportunities["distrito"] = np.nan
opportunities["municipio"] = np.nan
length = opportunities.shape[0]
totales = 0
sumat = 0
for i, opportunity in opportunities.iterrows():
provincia = opportunity.province
point = opportunity.geometry
distritosProvincia = distritos[distritos.NPRO == provincia]
for j, distrito in distritosProvincia.iterrows():
geom = distrito.geometry
cudis = distrito.CUDIS
nmun = distrito.NMUN
if geom.contains(point):
opportunities["distrito"].loc[i] = cudis
opportunities["municipio"].loc[i] = nmun
break
totales += 1
if totales == 100:
sumat += totales
totales = 0
print(str(sumat) + " completados de " + str(length))
opportunities.drop(columns=["geometry", "lon", "lat"], inplace = True)
opportunities["distrito"] = opportunities.distrito.astype(str)
opportunities["distrito"] = opportunities.distrito + "x"
opportunities = pd.concat([opportunitiesDistritos, opportunities], sort = True)
return opportunities
def assignDistrictsToProposals(proposalsData):
properties = getPropertiesFromDBX()
proposalsData = proposalsData.merge(properties, how = "left", on = "propertyId")
proposalsData = gpd.GeoDataFrame(proposalsData, geometry = proposalsData.geometry)
return proposalsData
def groupByDistricts(proposalsData):
opportunities = getOpportunitiesFromDBX()
pricePerRooms = proposalsData.dropna(subset=["monthlyPrice"]).groupby(["distrito", "rooms"])\
["monthlyPrice"].median().reset_index()\
.pivot(index = "distrito", columns = "rooms", values = "monthlyPrice").reset_index()
proposalsNumber = proposalsData.groupby("distrito")["proposalStage"].count().reset_index()\
.sort_values(by="proposalStage", ascending = False)\
.rename(columns = {"proposalStage": "proposalsNumber"})
propertiesProposedPerDistrict = proposalsData.groupby("distrito")["propertyId"]\
.nunique().reset_index().rename(columns = {"propertyId": "numberPropertiesProposed"})
propertiesAcceptedPerDistrict = proposalsData[proposalsData.proposalStage == "ACCEPTED"]\
.groupby("distrito")["propertyId"].nunique().reset_index()\
.rename(columns = {"propertyId": "numberPropertiesAccepted"})
propertiesPerDistrict = properties[["distrito", "propertyId"]]\
.groupby("distrito").count().reset_index().rename(columns={"propertyId": "numberProperties"})
acceptedProposalsNumber = proposalsData[proposalsData.proposalStage == "ACCEPTED"]\
.groupby("distrito")["proposalStage"].count().reset_index()\
.sort_values(by="proposalStage", ascending = False)\
.rename(columns = {"proposalStage": "acceptedProposalsNumber"})
durationPerDistrict = proposalsData.groupby("distrito")["duration"].median().reset_index()
opportunitiesPerDistrict = opportunities.groupby("distrito")["id"].count().reset_index()\
.rename(columns={"id": "numOpportunities"})\
.sort_values(ascending = False, by = "numOpportunities")
bookingsPerDistrict = opportunities\
[(opportunities.rentalRequestStage == "S030_CONTRACT_INVOICE_MANAGMENT")\
| (opportunities.rentalRequestStage =="S040_CHECK_IN") | \
(opportunities.rentalRequestStage == "S050_RENTAL_CONFIRMED") | \
(opportunities.rentalRequestStage =="S060_CONVERSION_HISTORY")]\
.groupby("distrito")["id"].count().reset_index().rename(columns={"id": "numBookings"})\
.sort_values(ascending = False, by = "numBookings")
distritos = proposalsData[["distrito", "geometry", "municipio", "propertyCity"]]\
.drop_duplicates(subset = "distrito")
heatmap_data = proposalsNumber.merge(acceptedProposalsNumber, how = "left", on ="distrito")\
.merge(propertiesPerDistrict, how = "left", on = "distrito")\
.merge(propertiesProposedPerDistrict, how = "left", on = "distrito")\
.merge(propertiesAcceptedPerDistrict, how = "left", on = "distrito")\
.merge(durationPerDistrict, how = "left", on = "distrito")\
.merge(pricePerRooms, how = "left", on = "distrito")\
.merge(opportunitiesPerDistrict, how = "left", on = "distrito")\
.merge(bookingsPerDistrict, how = "left", on = "distrito")\
.merge(distritos, how = "left", on = "distrito")\
heatmap_data = gpd.GeoDataFrame(heatmap_data, geometry = heatmap_data.geometry)
heatmap_data["conversionRatio"] = heatmap_data["numBookings"]\
/heatmap_data["numOpportunities"]
heatmap_data["benchmark"] = heatmap_data["acceptedProposalsNumber"]\
/heatmap_data["proposalsNumber"]
heatmap_data["propertiesUsed"] = heatmap_data["numberPropertiesProposed"]\
/heatmap_data["numberProperties"]
heatmap_data["propertiesConverted"] = heatmap_data["numberPropertiesAccepted"]\
/heatmap_data["numberPropertiesProposed"]
heatmap_data["distrito"] = heatmap_data.distrito.astype(str)
return heatmap_data
def savePropertiesToDBX(properties):
properties.to_file("properties.shp")
filename = "properties.zip"
with ZipFile(filename, "w") as zip:
for file in ["properties.shp", "properties.cpg", "properties.dbf", "properties.shx"]:
zip.write(file)
LOCALFILE = "properties.zip"
BACKUPPATH = "/properties.zip"
uploadDropbox(TOKEN, LOCALFILE, BACKUPPATH)
def saveOpportunitiesToDBX(opportunities):
opportunities.to_csv("opportunities.csv")
LOCALFILE = "opportunities.csv"
BACKUPPATH = "/opportunities.csv"
uploadDropbox(TOKEN, LOCALFILE, BACKUPPATH)
def saveHeatmapToDBX(heatmap_data):
heatmap_data.to_file("heatmap.shp")
filename = "heatmap.zip"
with ZipFile(filename, "w") as zip:
for file in ["heatmap.shp", "heatmap.cpg", "heatmap.dbf", "heatmap.shx"]:
zip.write(file)
LOCALFILE = "heatmap.zip"
BACKUPPATH = "/heatmap.zip"
uploadDropbox(TOKEN, LOCALFILE, BACKUPPATH)
def saveProposalsToDBX(proposalsData):
proposalsData.drop(columns=["geometry"], inplace = True)
proposalsData["distrito"] = proposalsData.distrito.astype(str)
proposalsData.to_csv("proposalsData.csv")
LOCALFILE = "proposalsData.csv"
BACKUPPATH = "/proposalsData.csv"
uploadDropbox(TOKEN, LOCALFILE, BACKUPPATH)