FADE

7.1 Desarrollo de la ETL Pipeline

def getPropertiesData():
    s1 = Search(using=es, index="property").source(includes=
    ["address.locality",
    "address.administrativeAreaLevel2",
    "address.country",
    "id",
    "address.geoLocation"]
    )
    
    propertiesData = pd.DataFrame((d.to_dict() for d in s1.scan()))
    propertiesData["country"] = propertiesData.address.apply(lambda x: x.get("country")\
     if type(x) is not float else "")
    propertiesData["province"] = propertiesData.address.apply(lambda x: x.get\
    ("administrativeAreaLevel2") if type(x) is not float else "")
    propertiesData = propertiesData[propertiesData.country == "España"]
    propertiesData(columns={"id": "propertyId"}, inplace = True)
    propertiesData["lon"] = propertiesData.address.apply(lambda x: x.get("geoLocation")\
    .get("lon") if type(x) is not float else "")
    propertiesData["lat"] = propertiesData.address.apply(lambda x: x.get("geoLocation")\
    .get("lat") if type(x) is not float else "")
    propertiesData["propertyCity"] = propertiesData.address.apply(lambda x: x.get("locality")\
    if type(x) is not float else "")
    propertiesData = propertiesData.replace("Á", "A").replace("á", "a").replace("À", "A")\
    .replace("à", "a").replace("É", "E").replace("é", "e").replace("È", "E").replace("è", "e")\
    .replace("Í", "I").replace("í", "i").replace("Ì", "I").replace("ì", "i").replace("Ó", "O")\
    .replace("ó", "o").replace("Ò", "O").replace("ò", "o").replace("Ú","U").replace("ú", "u")\
    .replace("Ù", "U").replace("ù", "u").replace("Ñ", "NY").replace("ñ", "ny").replace("Ü", "U")\
    .replace("ü", "u")
    
    propertiesData = propertiesData.drop(["address", "country"], axis=1)
    geometry = propertiesData.apply(lambda x: Point(x.lon, x.lat), axis = 1)
    propertiesData = gpd.GeoDataFrame(propertiesData, geometry = geometry)
    
    return propertiesData
    
def getOpportunitiesData():
    s1 = Search(using=es, index="deal*").source(includes=
    ["address.locality",
    "address.administrativeAreaLevel2",
    "rentalRequestStage",
    "address.country",
    "id",
    "address.geoLocation"]
    )
    opportunitiesData = pd.DataFrame((d.to_dict() for d in s1.scan()))
    opportunitiesData["country"] = opportunitiesData.address.apply(lambda x: x.get("country")\
    if type(x) is not float else "")
    opportunitiesData["province"] = opportunitiesData.address.apply(lambda x: x.get\
    ("administrativeAreaLevel2") if type(x) is not float else "")
    opportunitiesData = opportunitiesData[opportunitiesData.country == "España"]
    opportunitiesData["lon"] = opportunitiesData.address.apply(lambda x: x.get("geoLocation")\
    .get("lon") if type(x) is not float else "")
    opportunitiesData["lat"] = opportunitiesData.address.apply(lambda x: x.get("geoLocation")\
    .get("lat") if type(x) is not float else "")
    opportunitiesData = opportunitiesData.replace("Á", "A").replace("á", "a").replace("À", "A")\
    .replace("à", "a").replace("É", "E").replace("é", "e").replace("È", "E").replace("è", "e")\
    .replace("Í", "I").replace("í", "i").replace("Ì", "I").replace("ì", "i").replace("Ó", "O")\
    .replace("ó", "o").replace("Ò", "O").replace("ò", "o").replace("Ú","U").replace("ú", "u")\
    .replace("Ù", "U").replace("ù", "u").replace("Ñ", "NY").replace("ñ", "ny").replace("Ü", "U")\
    .replace("ü", "u")
    
    opportunitiesData = opportunitiesData.drop(["address", "country"], axis=1)
    geometry = opportunitiesData.apply(lambda x: Point(x.lon, x.lat), axis = 1)
    opportunitiesData = gpd.GeoDataFrame(opportunitiesData, geometry = geometry)

    return opportunitiesData
    
def getProposalsData():
    statusQuery = Q("match", proposalStatus = "OK")
    acceptedQuery = Q("match", proposalStage = "ACCEPTED")
    rejectedQuery = Q("match", proposalStage = "REJECTED")
    completeQuery = statusQuery & (acceptedQuery | rejectedQuery)

    s1 = Search(using=es, index="proposal*").source(includes=
    ["address.locality",
    "address.administrativeAreaLevel2",
    "address.postalCode",
    "dateCheckIn",
    "dateCheckOut",
    "monthlyPriceDossier.amount",
    "property.beds",
    "proposalStage",
    "proposalStatus",
    "property.id",
    "creationDate",
    "address.geoLocation"]
    ).query("bool", filter = completeQuery)
    proposalsData = pd.DataFrame((d.to_dict() for d in s1.scan()))
    proposalsData = proposalsData[(proposalsData.dateCheckIn > 0) &\
    (proposalsData.dateCheckOut > 0)]
    proposalsData["propertyId"] = proposalsData.property.apply(lambda x: x.get("id")\
    if type(x) is not float else "")
    proposalsData["propertyCity"] = proposalsData.address.apply(lambda x: x.get("locality")\
    if type(x) is not float else "")
    proposalsData["province"] = proposalsData.address.apply(lambda x: x.get\
    ("administrativeAreaLevel2") if type(x) is not float else "")
    proposalsData["monthlyPrice"] = proposalsData.monthlyPriceDossier.apply(lambda x: x.get\
    ("amount") if type(x) is not float else "")
    proposalsData["dateCheckIn"] = pd.to_datetime(proposalsData["dateCheckIn"], unit = "ms")
    proposalsData["dateCheckOut"] = pd.to_datetime(proposalsData["dateCheckOut"], unit = "ms")
    proposalsData["beds"] = proposalsData.property.apply(lambda x: x.get("beds")\
    if type(x) is not float else "")
    proposalsData["duration"] = (proposalsData["dateCheckOut"] - proposalsData["dateCheckIn"])\
    .dt.days
    
    proposalsData = proposalsData.drop(
    ["property",
    "proposalStatus",
    "address",
    "monthlyPriceDossier",
    "creationDate",
    "dateCheckIn",
    "dateCheckOut"], axis=1
    )
    proposalsData = proposalsData.replace("Á", "A").replace("á", "a").replace("À", "A")\
    .replace("à", "a").replace("É", "E").replace("é", "e").replace("È", "E").replace("è", "e")\
    .replace("Í", "I").replace("í", "i").replace("Ì", "I").replace("ì", "i").replace("Ó", "O")\
    .replace("ó", "o").replace("Ò", "O").replace("ò", "o").replace("Ú","U").replace("ú", "u")\
    .replace("Ù", "U").replace("ù", "u").replace("Ñ", "NY").replace("ñ", "ny").replace("Ü", "U")\
    .replace("ü", "u")
    
    return proposalsData
def assignDistrictsToProperties(propertiesData):

    propertiesGeometry = getPropertiesFromDBX()
    distritos = getDistrictsFromDBX()
    propertiesData = propertiesData[~propertiesData["propertyId"]\
    .isin(propertiesGeometry["propertyId"])].reset_index()
    
    properties = propertiesData.copy()
    properties["distrito"] = np.nan
    properties["municipio"] = np.nan
    properties["polygon"] = np.nan
    length = properties.shape[0]
    totales = 0
    sumat = 0
    for i, inmueble in properties.iterrows():
        provincia = inmueble.province
        point = inmueble.geometry
        distritosProvincia = distritos[distritos.NPRO == provincia]
        for j, distrito in distritosProvincia.iterrows():
            geom = distrito.geometry
            cudis = distrito.CUDIS
            nmun = distrito.NMUN
            if geom.contains(point):
                properties["distrito"].loc[i] = cudis
                properties["municipio"].loc[i] = nmun
                properties["polygon"].loc[i] = geom
                break
            
        totales += 1
        if totales == 100:
            sumat += totales
            totales = 0
            print(str(sumat) + " completados de " + str(length))
    
    properties.drop(columns=["geometry", "index"], inplace = True)
    properties.rename(columns={"polygon": "geometry"}, inplace = True)
    properties = gpd.GeoDataFrame(properties, geometry = properties.geometry)
    properties = properties[["propertyId", "municipio", "distrito", "geometry"]]
    
    properties = pd.concat([propertiesGeometry, properties], sort = True)
    properties = properties.drop_duplicates(subset = "propertyId")
    
    return properties


def assignDistrictsToOpportunities(opportunitiesData):
    opportunitiesDistritos = getOpportunitiesFromDBX(url)
    opportunitiesData = opportunitiesData[~opportunitiesData["id"]\
    .isin(opportunitiesDistritos["id"])].reset_index()
    opportunitiesData.drop(columns=["index"], inplace=True)
    distritos = gpd.GeoDataFrame.from_file("coordenadasDistritos.shp")
    distritos = distritos.to_crs(epsg = 4326)
    # distritos = distritos.replace("Araba/Álava", "Álava")\
    # .replace('Balears, Illes', "Illes Balears")\
    # .replace('Castellón/Castelló', "Castellón")\
    # .replace('Coruña, A', "A Coruña")\
    # .replace('Rioja, La', "La Rioja")\
    # .replace('Palmas, Las', "Las Palmas")\
    # .replace('Valencia/Valéncia', "Valencia")
    
    opportunities = opportunitiesData.copy()
    opportunities["distrito"] = np.nan
    opportunities["municipio"] = np.nan
    length = opportunities.shape[0]
    totales = 0
    sumat = 0
    for i, opportunity in opportunities.iterrows():
        provincia = opportunity.province
        point = opportunity.geometry
        distritosProvincia = distritos[distritos.NPRO == provincia]
        for j, distrito in distritosProvincia.iterrows():
            geom = distrito.geometry
            cudis = distrito.CUDIS
            nmun = distrito.NMUN
            if geom.contains(point):
                opportunities["distrito"].loc[i] = cudis
                opportunities["municipio"].loc[i] = nmun
                break
            
        totales += 1
        if totales == 100:
            sumat += totales
            totales = 0
            print(str(sumat) + " completados de " + str(length))
    opportunities.drop(columns=["geometry", "lon", "lat"], inplace = True)
    opportunities["distrito"] = opportunities.distrito.astype(str)
    opportunities["distrito"] = opportunities.distrito + "x"
    opportunities = pd.concat([opportunitiesDistritos, opportunities], sort = True)
    
    return opportunities
    
def assignDistrictsToProposals(proposalsData):
    properties = getPropertiesFromDBX()
    proposalsData = proposalsData.merge(properties, how = "left", on = "propertyId")
    proposalsData = gpd.GeoDataFrame(proposalsData, geometry = proposalsData.geometry)
    
    return proposalsData
def groupByDistricts(proposalsData):
    opportunities = getOpportunitiesFromDBX()
    
    pricePerRooms = proposalsData.dropna(subset=["monthlyPrice"]).groupby(["distrito", "rooms"])\
    ["monthlyPrice"].median().reset_index()\
    .pivot(index = "distrito", columns = "rooms", values = "monthlyPrice").reset_index()
    
    proposalsNumber = proposalsData.groupby("distrito")["proposalStage"].count().reset_index()\
    .sort_values(by="proposalStage", ascending = False)\
    .rename(columns = {"proposalStage": "proposalsNumber"})
    
    propertiesProposedPerDistrict = proposalsData.groupby("distrito")["propertyId"]\
    .nunique().reset_index().rename(columns = {"propertyId": "numberPropertiesProposed"})
    
    propertiesAcceptedPerDistrict = proposalsData[proposalsData.proposalStage == "ACCEPTED"]\
    .groupby("distrito")["propertyId"].nunique().reset_index()\
    .rename(columns = {"propertyId": "numberPropertiesAccepted"})
    
    propertiesPerDistrict = properties[["distrito", "propertyId"]]\
    .groupby("distrito").count().reset_index().rename(columns={"propertyId": "numberProperties"})
    
    acceptedProposalsNumber = proposalsData[proposalsData.proposalStage == "ACCEPTED"]\
    .groupby("distrito")["proposalStage"].count().reset_index()\
    .sort_values(by="proposalStage", ascending = False)\
    .rename(columns = {"proposalStage": "acceptedProposalsNumber"})
    
    durationPerDistrict = proposalsData.groupby("distrito")["duration"].median().reset_index()
    
    opportunitiesPerDistrict = opportunities.groupby("distrito")["id"].count().reset_index()\
    .rename(columns={"id": "numOpportunities"})\
    .sort_values(ascending = False, by = "numOpportunities")
    
    bookingsPerDistrict = opportunities\
    [(opportunities.rentalRequestStage == "S030_CONTRACT_INVOICE_MANAGMENT")\
    | (opportunities.rentalRequestStage =="S040_CHECK_IN") | \
    (opportunities.rentalRequestStage == "S050_RENTAL_CONFIRMED") | \
    (opportunities.rentalRequestStage =="S060_CONVERSION_HISTORY")]\
    .groupby("distrito")["id"].count().reset_index().rename(columns={"id": "numBookings"})\
    .sort_values(ascending = False, by = "numBookings")
    
    distritos = proposalsData[["distrito", "geometry", "municipio", "propertyCity"]]\
    .drop_duplicates(subset = "distrito")
    
    heatmap_data = proposalsNumber.merge(acceptedProposalsNumber, how = "left", on ="distrito")\
    .merge(propertiesPerDistrict, how = "left", on = "distrito")\
    .merge(propertiesProposedPerDistrict, how = "left", on = "distrito")\
    .merge(propertiesAcceptedPerDistrict, how = "left", on = "distrito")\
    .merge(durationPerDistrict, how = "left", on = "distrito")\
    .merge(pricePerRooms, how = "left", on = "distrito")\
    .merge(opportunitiesPerDistrict, how = "left", on = "distrito")\
    .merge(bookingsPerDistrict, how = "left", on = "distrito")\
    .merge(distritos, how = "left", on = "distrito")\
    
    heatmap_data = gpd.GeoDataFrame(heatmap_data, geometry = heatmap_data.geometry)
    heatmap_data["conversionRatio"] = heatmap_data["numBookings"]\
    /heatmap_data["numOpportunities"]
    heatmap_data["benchmark"] = heatmap_data["acceptedProposalsNumber"]\
    /heatmap_data["proposalsNumber"]
    heatmap_data["propertiesUsed"] = heatmap_data["numberPropertiesProposed"]\
    /heatmap_data["numberProperties"]
    heatmap_data["propertiesConverted"] = heatmap_data["numberPropertiesAccepted"]\
    /heatmap_data["numberPropertiesProposed"]
    
    heatmap_data["distrito"] = heatmap_data.distrito.astype(str)
    
    return heatmap_data